mirror of https://github.com/apache/cloudstack.git
247 lines
8.5 KiB
Java
247 lines
8.5 KiB
Java
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
package com.cloud.async;
|
|
|
|
import java.sql.SQLException;
|
|
import java.sql.Statement;
|
|
import java.util.Date;
|
|
import java.util.List;
|
|
|
|
import javax.inject.Inject;
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.runner.RunWith;
|
|
import org.mockito.Mockito;
|
|
import org.springframework.test.context.ContextConfiguration;
|
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
|
|
|
import org.apache.cloudstack.config.ConfigDepot;
|
|
import org.apache.cloudstack.framework.jobs.AsyncJob;
|
|
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
|
|
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
|
|
import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
|
|
import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
|
|
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
|
|
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
|
|
import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
|
|
import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
|
|
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
|
|
import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
|
|
import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
|
|
import org.apache.cloudstack.framework.messagebus.MessageBus;
|
|
import org.apache.cloudstack.framework.messagebus.PublishScope;
|
|
import org.apache.cloudstack.jobs.JobInfo;
|
|
|
|
import com.cloud.cluster.ClusterManager;
|
|
import com.cloud.user.Account;
|
|
import com.cloud.user.AccountManager;
|
|
import com.cloud.user.AccountVO;
|
|
import com.cloud.user.User;
|
|
import com.cloud.user.UserVO;
|
|
import com.cloud.utils.Predicate;
|
|
import com.cloud.utils.component.ComponentContext;
|
|
import com.cloud.utils.db.Transaction;
|
|
|
|
@RunWith(SpringJUnit4ClassRunner.class)
|
|
@ContextConfiguration(locations="classpath:/AsyncJobTestContext.xml")
|
|
public class TestAsyncJobManager extends TestCase {
|
|
|
|
@Inject AsyncJobManager asyncMgr;
|
|
@Inject ClusterManager clusterMgr;
|
|
@Inject MessageBus messageBus;
|
|
@Inject AsyncJobMonitor jobMonitor;
|
|
@Inject AsyncJobJournalDao journalDao;
|
|
@Inject AsyncJobJoinMapDao joinMapDao;
|
|
@Inject AccountManager accountMgr;
|
|
@Inject SyncQueueDao syncQueueDao;
|
|
@Inject SyncQueueItemDao syncQueueItemDao;
|
|
@Inject ConfigDepot configDepot;
|
|
|
|
@Override
|
|
@Before
|
|
public void setUp() {
|
|
ComponentContext.initComponentsLifeCycle();
|
|
Mockito.when(clusterMgr.getManagementNodeId()).thenReturn(1L);
|
|
|
|
AccountVO account = new AccountVO();
|
|
Mockito.when(accountMgr.getSystemAccount()).thenReturn(account);
|
|
UserVO user = new UserVO();
|
|
Mockito.when(accountMgr.getSystemUser()).thenReturn(user);
|
|
|
|
Transaction.open("dummy");
|
|
|
|
// drop constraint check in order to do single table test
|
|
Statement stat = null;
|
|
try {
|
|
stat = Transaction.currentTxn().getConnection().createStatement();
|
|
stat.execute("SET foreign_key_checks = 0;");
|
|
} catch (SQLException e) {
|
|
} finally {
|
|
if(stat != null) {
|
|
try {
|
|
stat.close();
|
|
} catch (SQLException e) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override
|
|
@After
|
|
public void tearDown() {
|
|
Transaction.currentTxn().close();
|
|
}
|
|
|
|
@Test
|
|
public void testJobJournal() {
|
|
AsyncJobJournalVO journal = new AsyncJobJournalVO();
|
|
journal.setJobId(1L);
|
|
journal.setJournalType(AsyncJob.JournalType.SUCCESS);
|
|
journal.setJournalText("Journal record 1");
|
|
|
|
journalDao.persist(journal);
|
|
|
|
AsyncJobJournalVO journal2 = new AsyncJobJournalVO();
|
|
journal2.setJobId(1L);
|
|
journal2.setJournalType(AsyncJob.JournalType.SUCCESS);
|
|
journal2.setJournalText("Journal record 2");
|
|
|
|
journalDao.persist(journal2);
|
|
|
|
List<AsyncJobJournalVO> l = journalDao.getJobJournal(1L);
|
|
Assert.assertTrue(l.size() == 2);
|
|
journal = l.get(0);
|
|
Assert.assertTrue(journal.getJournalText().equals("Journal record 1"));
|
|
|
|
journal2 = l.get(1);
|
|
Assert.assertTrue(journal2.getJournalText().equals("Journal record 2"));
|
|
|
|
journalDao.expunge(journal.getId());
|
|
journalDao.expunge(journal2.getId());
|
|
}
|
|
|
|
@Test
|
|
public void testJoinMapDao() {
|
|
joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher");
|
|
joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher");
|
|
|
|
AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
|
|
Assert.assertTrue(record != null);
|
|
Assert.assertTrue(record.getJoinMsid() == 100);
|
|
Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.IN_PROGRESS);
|
|
|
|
joinMapDao.completeJoin(1, JobInfo.Status.SUCCEEDED, "Done", 101);
|
|
|
|
record = joinMapDao.getJoinRecord(2, 1);
|
|
Assert.assertTrue(record != null);
|
|
Assert.assertTrue(record.getJoinMsid() == 100);
|
|
Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
|
|
Assert.assertTrue(record.getJoinResult().equals("Done"));
|
|
Assert.assertTrue(record.getCompleteMsid() == 101);
|
|
|
|
record = joinMapDao.getJoinRecord(3, 1);
|
|
Assert.assertTrue(record != null);
|
|
Assert.assertTrue(record.getJoinMsid() == 100);
|
|
Assert.assertTrue(record.getJoinStatus() == JobInfo.Status.SUCCEEDED);
|
|
Assert.assertTrue(record.getJoinResult().equals("Done"));
|
|
Assert.assertTrue(record.getCompleteMsid() == 101);
|
|
|
|
joinMapDao.disjoinJob(2, 1);
|
|
joinMapDao.disjoinJob(3, 1);
|
|
}
|
|
|
|
@Test
|
|
public void testJoinWakeup() {
|
|
joinMapDao.joinJob(2, 1, 100, 3000, 120000, null, "wakeupHandler", "wakeupDispatcher");
|
|
joinMapDao.joinJob(3, 1, 100, 5000, 120000, null, "wakeupHandler", "wakeupDispatcher");
|
|
|
|
SyncQueueVO queue = new SyncQueueVO();
|
|
queue.setCreated(new Date());
|
|
queue.setLastProcessNumber(1L);
|
|
queue.setLastUpdated(new Date());
|
|
queue.setQueueSizeLimit(1);
|
|
queue.setSyncObjType("AsynJob");
|
|
queue.setSyncObjId(1L);
|
|
syncQueueDao.persist(queue);
|
|
|
|
SyncQueueItemVO queueItem = new SyncQueueItemVO();
|
|
queueItem.setQueueId(queue.getId());
|
|
queueItem.setContentId(2L);
|
|
queueItem.setContentType("AsyncJob");
|
|
queueItem.setLastProcessMsid(1L);
|
|
queueItem.setLastProcessNumber(1L);
|
|
syncQueueItemDao.persist(queueItem);
|
|
Assert.assertTrue(queueItem.getId() != 0);
|
|
|
|
joinMapDao.wakeupScan();
|
|
|
|
joinMapDao.disjoinJob(2, 1);
|
|
joinMapDao.disjoinJob(3, 1);
|
|
|
|
syncQueueItemDao.expunge(queueItem.getId());
|
|
syncQueueDao.expunge(queue.getId());
|
|
}
|
|
|
|
@Test
|
|
public void testPseudoJob() {
|
|
AsyncJob job = asyncMgr.getPseudoJob(Account.ACCOUNT_ID_SYSTEM, User.UID_SYSTEM);
|
|
Assert.assertTrue(job.getInstanceType().equals(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE));
|
|
Assert.assertTrue(job.getInstanceId().longValue() == Thread.currentThread().getId());
|
|
}
|
|
|
|
@Test
|
|
public void testWaitAndCheck() {
|
|
Thread thread = new Thread(new Runnable() {
|
|
@Override
|
|
public void run() {
|
|
for(int i = 0; i < 2; i++) {
|
|
try {
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException e) {
|
|
}
|
|
System.out.println("Publish wakeup message");
|
|
messageBus.publish(null, "VM", PublishScope.GLOBAL, null);
|
|
}
|
|
}
|
|
});
|
|
thread.start();
|
|
|
|
jobMonitor.registerActiveTask(1, 1);
|
|
|
|
asyncMgr.waitAndCheck(new AsyncJobVO(), new String[] {"VM"}, 5000L, 10000L, new Predicate() {
|
|
@Override
|
|
public boolean checkCondition() {
|
|
System.out.println("Check condition to exit");
|
|
messageBus.publish(null, AsyncJob.Topics.JOB_HEARTBEAT, PublishScope.LOCAL, 1L);
|
|
return false;
|
|
}
|
|
});
|
|
|
|
jobMonitor.unregisterActiveTask(1);
|
|
|
|
try {
|
|
thread.join();
|
|
} catch(InterruptedException e) {
|
|
}
|
|
}
|
|
}
|