server/test/com/cloud/async/AsyncJobTestConfiguration.java

This commit is contained in:
Kelven Yang 2013-04-19 21:55:03 -07:00
parent c51fae0b6c
commit 96ebac1e47
10 changed files with 56 additions and 782 deletions

View File

@ -86,7 +86,6 @@ import com.cloud.api.query.vo.UserVmJoinVO;
import com.cloud.api.query.vo.VolumeJoinVO;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.capacity.CapacityVO;
import com.cloud.capacity.dao.CapacityDao;
@ -661,10 +660,6 @@ public class ApiDBUtils {
return _resourceLimitMgr.findCorrectResourceLimitForAccount(accountType, limit, type);
}
public static AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
return _asyncMgr.findInstancePendingAsyncJob(instanceType, instanceId);
}
public static long getResourceCount(ResourceType type, long accountId) {
AccountVO account = _accountDao.findById(accountId);

View File

@ -24,7 +24,6 @@ import com.cloud.utils.component.Manager;
public interface AsyncJobManager extends Manager {
public AsyncJobVO getAsyncJob(long jobId);
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
public List<? extends AsyncJob> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId);

View File

@ -52,7 +52,6 @@ import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.User;
import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.PropertiesUtil;
@ -79,7 +78,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private SyncQueueManager _queueMgr;
@Inject private ClusterManager _clusterMgr;
@Inject private AccountManager _accountMgr;
@Inject private AccountDao _accountDao;
@Inject private AsyncJobDao _jobDao;
@Inject private ConfigurationDao _configDao;
@Inject private List<AsyncJobDispatcher> _jobDispatchers;
@ -105,11 +103,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return _jobDao.findById(jobId);
}
@Override
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
}
@Override
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) {
return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);

View File

@ -43,7 +43,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
protected SyncQueueItemDaoImpl() {
public SyncQueueItemDaoImpl() {
super();
queueIdSearch = createSearchBuilder(Long.class);
queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);

View File

@ -1,203 +0,0 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
components.xml is the configuration file for the VM Ops
insertion servers. Someone can quickly pull together an
insertion server by selecting the correct adapters to use.
Here are some places to look for information.
- To find out the general functionality that each Manager
or Adapter provide, look at the javadoc for the interface
that it implements. The interface is usually the
"key" attribute in the declaration.
- To find specific implementation of each Manager or
Adapter, look at the javadoc for the actual class. The
class can be found in the <class> element.
- To find out the configuration parameters for each Manager
or Adapter, look at the javadoc for the actual implementation
class. It should be documented in the description of the
class.
- To know more about the components.xml in general, look for
the javadoc for ComponentLocator.java.
If you found that the Manager or Adapter are not properly
documented, please contact the author.
-->
<components.xml>
<management-server class="com.vmops.server.ManagementServerImpl">
<dao name="domain router" class="com.cloud.vm.dao.DomainRouterDaoImpl"/>
<dao name="host" class="com.cloud.host.dao.HostDaoImpl"/>
<dao name="VM Instance" class="com.cloud.vm.dao.VMInstanceDaoImpl"/>
<dao name="User VM" class="com.cloud.vm.dao.UserVmDaoImpl"/>
<dao name="ServiceOffering" class="com.cloud.service.dao.ServiceOfferingDaoImpl">
<param name="cache.size">50</param>
<param name="cache.time.to.live">-1</param>
</dao>
<dao name="DiskOffering" class="com.cloud.storage.dao.DiskOfferingDaoImpl"/>
<dao name="VMDisk" class="com.cloud.storage.dao.VmDiskDaoImpl"/>
<dao name="host zone" class="com.cloud.dc.dao.DataCenterDaoImpl">
<param name="cache.size">50</param>
<param name="cache.time.to.live">-1</param>
</dao>
<dao name="host pod" class="com.cloud.dc.dao.HostPodDaoImpl">
<param name="cache.size">50</param>
<param name="cache.time.to.live">-1</param>
</dao>
<dao name="vlan" class="com.cloud.dc.dao.VlanDaoImpl">
</dao>
<dao name="Volume" class="com.cloud.storage.dao.VolumeDaoImpl"/>
<dao name="Events" class="com.cloud.event.dao.EventDaoImpl"/>
<dao name="VM Template" class="com.cloud.storage.dao.VMTemplateDaoImpl">
<param name="cache.size">50</param>
<param name="cache.time.to.live">-1</param>
<param name="routing.uniquename">routing</param>
</dao>
<dao name="User" class="com.cloud.user.dao.UserDaoImpl">
<param name="cache.size">5000</param>
<param name="cache.time.to.live">300</param>
</dao>
<dao name="UserStats" class="com.cloud.user.dao.UserStatisticsDaoImpl"/>
<dao name="Disk Template" class="com.cloud.storage.dao.DiskTemplateDaoImpl">
<param name="cache.size">50</param>
<param name="cache.time.to.live">-1</param>
</dao>
<dao name="Firewall Rules" class="com.cloud.network.dao.FirewallRulesDaoImpl"/>
<dao name="Security Group" class="com.cloud.network.dao.SecurityGroupDaoImpl"/>
<dao name="Load Balancer" class="com.cloud.network.dao.LoadBalancerDaoImpl"/>
<dao name="Network Rule Config" class="com.cloud.network.dao.NetworkRuleConfigDaoImpl"/>
<dao name="Security Group Mapping" class="com.cloud.network.dao.SecurityGroupVMMapDaoImpl"/>
<dao name="Load Balancer Mapping" class="com.cloud.network.dao.LoadBalancerVMMapDaoImpl"/>
<dao name="IP Addresses" class="com.cloud.network.dao.IPAddressDaoImpl"/>
<dao name="Datacenter IP Addresses" class="com.cloud.dc.dao.DataCenterIpAddressDaoImpl"/>
<dao name="Pricing" class="com.cloud.pricing.dao.PricingDaoImpl"/>
<dao name="Usage" class="com.cloud.usage.dao.UsageDaoImpl"/>
<dao name="UsageJob" class="com.cloud.usage.dao.UsageJobDaoImpl"/>
<dao name="Alert" class="com.cloud.alert.dao.AlertDaoImpl"/>
<dao name="Capacity" class="com.cloud.capacity.dao.CapacityDaoImpl"/>
<dao name="Domain" class="com.cloud.domain.dao.DomainDaoImpl"/>
<dao name="Account" class="com.cloud.user.dao.AccountDaoImpl"/>
<dao name="Limit" class="com.cloud.configuration.dao.LimitDaoImpl"/>
<dao name="UserAccount" class="com.cloud.user.dao.UserAccountDaoImpl"/>
<dao name="Usage IPAddress" class="com.cloud.usage.dao.UsageIPAddressDaoImpl"/>
<dao name="VM Template Host" class="com.cloud.storage.dao.VMTemplateHostDaoImpl"/>
<dao name="VM Template Swift" class="com.cloud.storage.dao.VMTemplateSwiftDaoImpl"/>
<dao name="Upload" class="com.cloud.storage.dao.UploadDaoImpl"/>
<dao name="VM Template Pool" class="com.cloud.storage.dao.VMTemplatePoolDaoImpl"/>
<dao name="Launch Permission" class="com.cloud.storage.dao.LaunchPermissionDaoImpl"/>
<dao name="Configuration" class="com.cloud.configuration.dao.ConfigurationDaoImpl"/>
<dao name="HA" class="com.cloud.ha.dao.HighAvailabilityDaoImpl"/>
<dao name="Console Proxy" class="com.cloud.vm.dao.ConsoleProxyDaoImpl"/>
<dao name="Upgrade" class="com.cloud.maint.dao.AgentUpgradeDaoImpl"/>
<dao name="Management Server Host" class="com.cloud.cluster.dao.ManagementServerHostDaoImpl"/>
<dao name="Snapshot" class="com.cloud.storage.dao.SnapshotDaoImpl"/>
<dao name="ScheduledVMBackup" class="com.cloud.user.dao.ScheduledVMBackupDaoImpl"/>
<dao name="AsyncJobDao" class="com.cloud.async.dao.AsyncJobDaoImpl"/>
<dao name="SyncQueueDao" class="com.cloud.async.dao.SyncQueueDaoImpl"/>
<dao name="SyncQueueItemDao" class="com.cloud.async.dao.SyncQueueItemDaoImpl"/>
<dao name="GuestOSDao" class="com.cloud.storage.dao.GuestOSDaoImpl"/>
<dao name="StoragePoolDao" class="com.cloud.storage.dao.StoragePoolDaoImpl"/>
<dao name="StoragePoolHostDao" class="com.cloud.storage.dao.StoragePoolHostDaoImpl"/>
<dao name="HostDetails" class="com.cloud.host.dao.DetailsDaoImpl"/>
<adapters key="com.cloud.agent.manager.allocator.HostAllocator">
<adapter name="FirstFitRouting" class="com.cloud.agent.manager.allocator.impl.FirstFitRoutingAllocator"/>
<adapter name="FirstFit" class="com.cloud.agent.manager.allocator.impl.FirstFitAllocator"/>
</adapters>
<adapters key="com.cloud.agent.manager.allocator.StorageAllocator">
<adapter name="Storage" class="com.cloud.agent.manager.allocator.impl.FirstFitStorageAllocator">
<param name="storage.overprovisioning.factor">2</param>
</adapter>
<adapter name="Storage" class="com.cloud.agent.manager.allocator.impl.RandomStoragePoolAllocator">
<param name="storage.overprovisioning.factor">2</param>
</adapter>
</adapters>
<adapters key="com.cloud.agent.manager.allocator.PodAllocator">
<adapter name="User First" class="com.cloud.agent.manager.allocator.impl.UserConcentratedAllocator"/>
</adapters>
<adapters key="com.cloud.consoleproxy.ConsoleProxyAllocator">
<adapter name="Balance" class="com.cloud.consoleproxy.ConsoleProxyBalanceAllocator"/>
</adapters>
<adapters key="com.cloud.server.auth.UserAuthenticator">
<adapter name="MD5" class="com.cloud.server.auth.MD5UserAuthenticator"/>
</adapters>
<adapters key="com.cloud.ha.Investigator">
<adapter name="SimpleInvestigator" class="com.cloud.ha.CheckOnAgentInvestigator"/>
<adapter name="PingInvestigator" class="com.cloud.ha.InvestigatorImpl"/>
</adapters>
<adapters key="com.cloud.ha.FenceBuilder">
<adapter name="StorageFenceBuilder" class="com.cloud.ha.StorageFence"/>
</adapters>
<adapters key="com.cloud.cluster.ClusterServiceAdapter">
<adapter name="ClusterService" class="com.cloud.cluster.ClusterServiceServletAdapter"/>
</adapters>
<adapters key="com.cloud.resource.Discoverer">
<adapter name="XenServer" class="com.cloud.resource.xen.XenServerDiscoverer"/>
<adapter name="SecondaryStorage" class="com.cloud.storage.secondary.SecondaryStorageDiscoverer"/>
<adapter name="DummyHostServer" class="com.cloud.resource.DummyHostDiscoverer"/>
</adapters>
<manager name="account manager" class="com.cloud.user.AccountManagerImpl">
</manager>
<manager name="agent manager" class="com.cloud.agent.manager.ClusteredAgentManagerImpl">
</manager>
<manager name="configuration manager" class="com.cloud.configuration.ConfigurationManagerImpl">
<param name="premium">true</param>
</manager>
<manager name="network manager" class="com.cloud.network.NetworkManagerImpl">
</manager>
<manager name="download manager" class="com.cloud.storage.download.DownloadMonitorImpl">
</manager>
<manager name="upload manager" class="com.cloud.storage.upload.UploadMonitorImpl">
</manager>
<manager name="console proxy manager" class="com.cloud.consoleproxy.ConsoleProxyManagerImpl">
</manager>
<manager name="vm manager" class="com.cloud.vm.UserVmManagerImpl"/>
<manager name="upgrade manager" class="com.cloud.maint.UpgradeManagerImpl">
</manager>
<manager name="StorageManager" class="com.cloud.storage.StorageManagerImpl">
</manager>
<manager name="HA Manager" class="com.cloud.ha.HighAvailabilityManagerExtImpl">
</manager>
<manager name="Cluster Manager" class="com.cloud.cluster.ClusterManagerImpl">
</manager>
<manager name="SyncQueueManager" class="com.cloud.async.SyncQueueManagerImpl">
</manager>
<manager name="AsyncJobManager" class="com.cloud.async.AsyncJobManagerImpl">
</manager>
<manager name="AsyncJobExecutorContext" class="com.cloud.async.AsyncJobExecutorContextImpl">
</manager>
<manager name="Alert Manager" class="com.cloud.alert.AlertManagerImpl">
</manager>
<manager name="Template Manager" class="com.cloud.template.TemplateManagerImpl">
</manager>
<adapters key="com.cloud.alert.AlertAdapter">
<adapter name="ClusterAlert" class="com.cloud.alert.ClusterAlertAdapter"/>
<adapter name="ConsoleProxyAlert" class="com.cloud.alert.ConsoleProxyAlertAdapter"/>
</adapters>
</management-server>
</components.xml>

View File

@ -1,29 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.async;
import org.apache.log4j.Logger;
public class CleanupDelegate implements com.cloud.utils.CleanupDelegate<String, Object> {
private static final Logger s_logger = Logger.getLogger(CleanupDelegate.class);
@Override
public boolean cleanup(String param, Object managerContext) {
s_logger.info("Action called with param: " + param);
return true;
}
}

View File

@ -1,281 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.async;
import java.util.List;
import junit.framework.Assert;
import org.apache.log4j.Logger;
import com.cloud.cluster.CheckPointVO;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.testcase.Log4jEnabledTestCase;
public class TestAsync extends Log4jEnabledTestCase {
private static final Logger s_logger = Logger.getLogger(TestAsync.class);
/*
public static class SampleAsyncResult {
@Param(name="name", propName="name")
private final String _name;
@Param
private final int count;
public SampleAsyncResult(String name, int count) {
_name = name;
this.count = count;
}
public String getName() { return _name; }
public int getCount() { return count; }
}
public void testDao() {
AsyncJobDao dao = new AsyncJobDaoImpl();
AsyncJobVO job = new AsyncJobVO(1, 1, "TestCmd", null);
job.setInstanceType("user_vm");
job.setInstanceId(1000L);
char[] buf = new char[1024];
for(int i = 0; i < 1024; i++)
buf[i] = 'a';
job.setResult(new String(buf));
dao.persist(job);
AsyncJobVO jobVerify = dao.findById(job.getId());
Assert.assertTrue(jobVerify.getCmd().equals(job.getCmd()));
Assert.assertTrue(jobVerify.getUserId() == 1);
Assert.assertTrue(jobVerify.getAccountId() == 1);
String result = jobVerify.getResult();
for(int i = 0; i < 1024; i++)
Assert.assertTrue(result.charAt(i) == 'a');
jobVerify = dao.findInstancePendingAsyncJob("user_vm", 1000L);
Assert.assertTrue(jobVerify != null);
Assert.assertTrue(jobVerify.getCmd().equals(job.getCmd()));
Assert.assertTrue(jobVerify.getUserId() == 1);
Assert.assertTrue(jobVerify.getAccountId() == 1);
}
public void testSerialization() {
List<Pair<String, Object>> l;
int value = 1;
l = SerializerHelper.toPairList(value, "result");
Assert.assertTrue(l.size() == 1);
Assert.assertTrue(l.get(0).first().equals("result"));
Assert.assertTrue(l.get(0).second().equals("1"));
l.clear();
SampleAsyncResult result = new SampleAsyncResult("vmops", 1);
l = SerializerHelper.toPairList(result, "result");
Assert.assertTrue(l.size() == 2);
Assert.assertTrue(l.get(0).first().equals("name"));
Assert.assertTrue(l.get(0).second().equals("vmops"));
Assert.assertTrue(l.get(1).first().equals("count"));
Assert.assertTrue(l.get(1).second().equals("1"));
}
public void testAsyncResult() {
AsyncJobResult result = new AsyncJobResult(1);
result.setResultObject(100);
Assert.assertTrue(result.getResult().equals("java.lang.Integer/100"));
Object obj = result.getResultObject();
Assert.assertTrue(obj instanceof Integer);
Assert.assertTrue(((Integer)obj).intValue() == 100);
}
public void testTransaction() {
Transaction txn = Transaction.open("testTransaction");
try {
txn.start();
AsyncJobDao dao = new AsyncJobDaoImpl();
AsyncJobVO job = new AsyncJobVO(1, 1, "TestCmd", null);
job.setInstanceType("user_vm");
job.setInstanceId(1000L);
job.setResult("");
dao.persist(job);
txn.rollback();
} finally {
txn.close();
}
}
public void testMorevingian() {
int threadCount = 10;
final int testCount = 10;
Thread[] threads = new Thread[threadCount];
for(int i = 0; i < threadCount; i++) {
final int threadNum = i + 1;
threads[i] = new Thread(new Runnable() {
public void run() {
for(int i = 0; i < testCount; i++) {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
AsyncJobDao dao = new AsyncJobDaoImpl();
s_logger.info("Thread " + threadNum + " acquiring lock");
AsyncJobVO job = dao.acquire(1L, 30);
if(job != null) {
s_logger.info("Thread " + threadNum + " acquired lock");
try {
Thread.sleep(Log4jEnabledTestCase.getRandomMilliseconds(1000, 3000));
} catch (InterruptedException e) {
}
s_logger.info("Thread " + threadNum + " acquiring lock nestly");
AsyncJobVO job2 = dao.acquire(1L, 30);
if(job2 != null) {
s_logger.info("Thread " + threadNum + " acquired lock nestly");
try {
Thread.sleep(Log4jEnabledTestCase.getRandomMilliseconds(1000, 3000));
} catch (InterruptedException e) {
}
s_logger.info("Thread " + threadNum + " releasing lock (nestly acquired)");
dao.release(1L);
s_logger.info("Thread " + threadNum + " released lock (nestly acquired)");
} else {
s_logger.info("Thread " + threadNum + " was unable to acquire lock nestly");
}
s_logger.info("Thread " + threadNum + " releasing lock");
dao.release(1L);
s_logger.info("Thread " + threadNum + " released lock");
} else {
s_logger.info("Thread " + threadNum + " was unable to acquire lock");
}
} finally {
txn.close();
}
try {
Thread.sleep(Log4jEnabledTestCase.getRandomMilliseconds(1000, 10000));
} catch (InterruptedException e) {
}
}
}
});
}
for(int i = 0; i < threadCount; i++) {
threads[i].start();
}
for(int i = 0; i < threadCount; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
}
}
}
*/
public void testMaid() {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
dao.pushCleanupDelegate(1L, 0, "delegate1", "Hello, world");
dao.pushCleanupDelegate(1L, 1, "delegate2", new Long(100));
dao.pushCleanupDelegate(1L, 2, "delegate3", null);
CheckPointVO item = dao.popCleanupDelegate(1L);
Assert.assertTrue(item.getDelegate().equals("delegate3"));
Assert.assertTrue(item.getContext() == null);
item = dao.popCleanupDelegate(1L);
Assert.assertTrue(item.getDelegate().equals("delegate2"));
s_logger.info(item.getContext());
item = dao.popCleanupDelegate(1L);
Assert.assertTrue(item.getDelegate().equals("delegate1"));
s_logger.info(item.getContext());
txn.close();
}
public void testMaidClear() {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
dao.pushCleanupDelegate(1L, 0, "delegate1", "Hello, world");
dao.pushCleanupDelegate(1L, 1, "delegate2", new Long(100));
dao.pushCleanupDelegate(1L, 2, "delegate3", null);
dao.clearStack(1L);
Assert.assertTrue(dao.popCleanupDelegate(1L) == null);
txn.close();
}
public void testMaidLeftovers() {
Thread[] threads = new Thread[3];
for(int i = 0; i < 3; i++) {
final int threadNum = i+1;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
dao.pushCleanupDelegate(1L, 0, "delegate-" + threadNum, "Hello, world");
dao.pushCleanupDelegate(1L, 1, "delegate-" + threadNum, new Long(100));
dao.pushCleanupDelegate(1L, 2, "delegate-" + threadNum, null);
txn.close();
}
});
threads[i].start();
}
for(int i = 0; i < 3; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
}
}
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
List<CheckPointVO> l = dao.listLeftoversByMsid(1L);
for(CheckPointVO maid : l) {
s_logger.info("" + maid.getThreadId() + " " + maid.getDelegate() + " " + maid.getContext());
}
txn.close();
}
}

View File

@ -16,237 +16,44 @@
// under the License.
package com.cloud.async;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.cloud.domain.DomainVO;
import com.cloud.domain.dao.DomainDao;
import com.cloud.domain.dao.DomainDaoImpl;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.host.HostVO;
import com.cloud.host.dao.HostDao;
import com.cloud.host.dao.HostDaoImpl;
import com.cloud.cluster.ClusterManager;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.db.Transaction;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:/AsyncJobTestContext.xml")
public class TestAsyncJobManager extends TestCase {
public static final Logger s_logger = Logger.getLogger(TestAsyncJobManager.class.getName());
volatile long s_count = 0;
@Inject AsyncJobManager asyncMgr;
@Inject ClusterManager clusterMgr;
public void asyncCall() {
// long jobId = mgr.rebootVirtualMachineAsync(1, 1);
long jobId = 0L;
s_logger.info("Async-call job id: " + jobId);
while(true) {
AsyncJobResult result;
try {
result = asyncMgr.queryAsyncJobResult(jobId);
if(result.getJobStatus() != AsyncJobResult.STATUS_IN_PROGRESS) {
s_logger.info("Async-call completed, result: " + result.toString());
break;
}
s_logger.info("Async-call is in progress, progress: " + result.toString());
} catch (PermissionDeniedException e1) {
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
public void sequence() {
final HostDao hostDao = new HostDaoImpl();
long seq = hostDao.getNextSequence(1);
s_logger.info("******* seq : " + seq + " ********");
HashMap<Long, Long> hashMap = new HashMap<Long, Long>();
final Map<Long, Long> map = Collections.synchronizedMap(hashMap);
s_count = 0;
final long maxCount = 1000000; // test one million times
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
while(s_count < maxCount) {
s_count++;
long seq = hostDao.getNextSequence(1);
Assert.assertTrue(map.put(seq, seq) == null);
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
while(s_count < maxCount) {
s_count++;
long seq = hostDao.getNextSequence(1);
Assert.assertTrue(map.put(seq, seq) == null);
}
}
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
}
}
/*
public void ipAssignment() {
final IPAddressDao ipAddressDao = new IPAddressDaoImpl();
final ConcurrentHashMap<String, IPAddressVO> map = new ConcurrentHashMap<String, IPAddressVO>();
//final Map<String, String> map = Collections.synchronizedMap(hashMap);
s_count = 0;
final long maxCount = 1000000; // test one million times
Thread t1 = new Thread(new Runnable() {
public void run() {
while(s_count < maxCount) {
s_count++;
Transaction txn = Transaction.open("Alex1");
try {
IPAddressVO addr = ipAddressDao.assignIpAddress(1, 0, 1, false);
IPAddressVO returnStr = map.put(addr.getAddress(), addr);
if(returnStr != null) {
System.out.println("addr : " + addr.getAddress());
}
Assert.assertTrue(returnStr == null);
} finally {
txn.close();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
while(s_count < maxCount) {
s_count++;
Transaction txn = Transaction.open("Alex2");
try {
IPAddressVO addr = ipAddressDao.assignIpAddress(1, 0, 1, false);
Assert.assertTrue(map.put(addr.getAddress(), addr) == null);
} finally {
txn.close();
}
}
}
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
}
}
*/
private long getRandomLockId() {
return 1L;
/*
* will use in the future test cases
int i = new Random().nextInt();
if(i % 2 == 0)
return 1L;
return 2L;
*/
}
public void tstLocking() {
int testThreads = 20;
Thread[] threads = new Thread[testThreads];
for(int i = 0; i < testThreads; i++) {
final int current = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
final HostDao hostDao = new HostDaoImpl();
while(true) {
Transaction txn = Transaction.currentTxn();
try {
HostVO host = hostDao.acquireInLockTable(getRandomLockId(), 10);
if(host != null) {
s_logger.info("Thread " + (current + 1) + " acquired lock");
try { Thread.sleep(1000); } catch (InterruptedException e) {}
s_logger.info("Thread " + (current + 1) + " released lock");
hostDao.releaseFromLockTable(host.getId());
try { Thread.sleep(1000); } catch (InterruptedException e) {}
} else {
s_logger.info("Thread " + (current + 1) + " is not able to acquire lock");
}
} finally {
txn.close();
}
}
}
});
threads[i].start();
}
try {
for(int i = 0; i < testThreads; i++)
threads[i].join();
} catch(InterruptedException e) {
}
}
public void testDomain() {
DomainDao domainDao = new DomainDaoImpl();
DomainVO domain1 = new DomainVO("d1", 2L, 1L, null, 1);
domainDao.create(domain1);
DomainVO domain2 = new DomainVO("d2", 2L, 1L, null, 1);
domainDao.create(domain2);
DomainVO domain3 = new DomainVO("d3", 2L, 1L, null, 1);
domainDao.create(domain3);
DomainVO domain11 = new DomainVO("d11", 2L, domain1.getId(), null, 1);
domainDao.create(domain11);
domainDao.remove(domain11.getId());
DomainVO domain12 = new DomainVO("d12", 2L, domain1.getId(), null, 1);
domainDao.create(domain12);
domainDao.remove(domain3.getId());
DomainVO domain4 = new DomainVO("d4", 2L, 1L, null, 1);
domainDao.create(domain4);
@Before
public void setUp() {
ComponentContext.initComponentsLifeCycle();
Mockito.when(clusterMgr.getManagementNodeId()).thenReturn(1L);
Transaction.open("dummy");
}
@After
public void tearDown() {
Transaction.currentTxn().close();
}
@Test
public void test() {
}
}

View File

@ -23,18 +23,40 @@ import javax.inject.Inject;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.db.Transaction;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:/SyncQueueTestContext.xml")
public class TestSyncQueueManager extends TestCase {
public static final Logger s_logger = Logger.getLogger(TestSyncQueueManager.class.getName());
private volatile int count = 0;
private volatile long expectingCurrent = 1;
@Inject SyncQueueManager mgr;
@Before
public void setUp() {
ComponentContext.initComponentsLifeCycle();
Transaction.open("dummy");
}
@After
public void tearDown() {
Transaction.currentTxn().close();
}
public void leftOverItems() {
List<SyncQueueItemVO> l = mgr.getActiveQueueItems(1L, false);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
@ -72,8 +94,7 @@ public class TestSyncQueueManager extends TestCase {
}
}
}
}
);
});
Thread thread2 = new Thread(new Runnable() {
@Override
@ -95,8 +116,7 @@ public class TestSyncQueueManager extends TestCase {
}
}
}
}
);
});
thread1.start();
thread2.start();
@ -143,8 +163,7 @@ public class TestSyncQueueManager extends TestCase {
}
}
}
}
);
});
Thread thread2 = new Thread(new Runnable() {
@Override
@ -167,8 +186,7 @@ public class TestSyncQueueManager extends TestCase {
}
}
}
}
);
});
thread1.start();
thread2.start();
@ -192,6 +210,7 @@ public class TestSyncQueueManager extends TestCase {
mgr.queue("vm_instance", q, "Async-job", i+1, 1);
}
@Test
public void testSyncQueue() {
mgr.queue("vm_instance", 1, "Async-job", 1, 1);

View File

@ -1,26 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<components.xml>
<management-server class="com.cloud.async.TestSyncQueueManager">
<dao name="sync queue dao" class="com.cloud.async.dao.SyncQueueDaoImpl"/>
<dao name="sync queue item dao" class="com.cloud.async.dao.SyncQueueItemDaoImpl"/>
<manager name="sync queue manager" class="com.cloud.async.SyncQueueManagerImpl">
</manager>
</management-server>
</components.xml>