CLOUDSTACK-304: Synchronization support for createSnapshot command - don't sent more than "concurrent.snapshots.threshold.perhost" createSnapshots commands to the backend host.

Conflicts:

	core/src/com/cloud/async/AsyncJobVO.java
	core/src/com/cloud/async/SyncQueueVO.java
	server/src/com/cloud/api/ApiDispatcher.java
	server/src/com/cloud/api/ApiServer.java
	server/src/com/cloud/async/AsyncJobManagerImpl.java
	server/src/com/cloud/async/SyncQueueManager.java
	server/src/com/cloud/async/SyncQueueManagerImpl.java
	server/src/com/cloud/async/dao/SyncQueueDao.java
	server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
	server/test/com/cloud/async/TestSyncQueueManager.java
	setup/db/create-schema.sql
This commit is contained in:
Alena Prokharchyk 2012-10-03 18:36:51 -07:00
parent c36744a18c
commit 339aa41442
20 changed files with 671 additions and 626 deletions

View File

@ -25,10 +25,11 @@ import com.cloud.user.UserContext;
* queryAsyncJobResult API command.
*/
public abstract class BaseAsyncCmd extends BaseCmd {
public static final String ipAddressSyncObject = "ipaddress";
public static final String networkSyncObject = "network";
public static final String vpcSyncObject = "vpc";
public static final String snapshotHostSyncObject = "snapshothost";
private AsyncJob job;

View File

@ -19,6 +19,7 @@ package com.cloud.api.commands;
import org.apache.log4j.Logger;
import com.cloud.api.ApiConstants;
import com.cloud.api.BaseAsyncCmd;
import com.cloud.api.BaseAsyncCreateCmd;
import com.cloud.api.BaseCmd;
import com.cloud.api.IdentityMapper;
@ -60,6 +61,8 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
@IdentityMapper(entityTableName="snapshot_policy")
@Parameter(name = ApiConstants.POLICY_ID, type = CommandType.LONG, description = "policy id of the snapshot, if this is null, then use MANUAL_POLICY.")
private Long policyId;
private String syncObjectType = BaseAsyncCmd.snapshotHostSyncObject;
// ///////////////////////////////////////////////////
// ///////////////// Accessors ///////////////////////
@ -88,7 +91,16 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
return Snapshot.MANUAL_POLICY_ID;
}
}
private Long getHostId() {
Volume volume = _entityMgr.findById(Volume.class, getVolumeId());
if (volume == null) {
throw new InvalidParameterValueException("Unable to find volume by id");
}
return _snapshotService.getHostIdForSnapshotOperation(volume);
}
// ///////////////////////////////////////////////////
// ///////////// API Implementation///////////////////
// ///////////////////////////////////////////////////
@ -161,4 +173,21 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + volumeId);
}
}
@Override
public String getSyncObjType() {
if (getSyncObjId() != null) {
return syncObjectType;
}
return null;
}
@Override
public Long getSyncObjId() {
if (getHostId() != null) {
return getHostId();
}
return null;
}
}

View File

@ -26,6 +26,7 @@ import com.cloud.api.commands.ListSnapshotsCmd;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceAllocationException;
import com.cloud.storage.Snapshot;
import com.cloud.storage.Volume;
import com.cloud.user.Account;
public interface SnapshotService {
@ -97,4 +98,10 @@ public interface SnapshotService {
* @return the Snapshot that was created
*/
Snapshot createSnapshot(Long volumeId, Long policyId, Long snapshotId, Account snapshotOwner);
/**
* @param vol
* @return
*/
Long getHostIdForSnapshotOperation(Volume vol);
}

View File

@ -119,28 +119,31 @@ public class AsyncJobVO implements AsyncJob {
@Transient
private boolean fromPreviousSession = false;
public AsyncJobVO() {
this.uuid = UUID.randomUUID().toString();
}
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo) {
this.userId = userId;
this.accountId = accountId;
this.cmd = cmd;
this.cmdInfo = cmdInfo;
public AsyncJobVO() {
this.uuid = UUID.randomUUID().toString();
}
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, Type instanceType) {
this.userId = userId;
this.accountId = accountId;
this.cmd = cmd;
this.cmdInfo = cmdInfo;
this.callbackType = CALLBACK_POLLING;
this.uuid = UUID.randomUUID().toString();
}
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo,
int callbackType, String callbackAddress) {
this(userId, accountId, cmd, cmdInfo);
this.callbackType = callbackType;
this.callbackAddress = callbackAddress;
this.instanceId = instanceId;
}
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo,
int callbackType, String callbackAddress, Long instanceId, Type instanceType) {
this(userId, accountId, cmd, cmdInfo, instanceId, instanceType);
this.callbackType = callbackType;
this.callbackAddress = callbackAddress;
this.uuid = UUID.randomUUID().toString();
}
@Override
public Long getId() {
return id;

View File

@ -24,6 +24,8 @@ import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
@Entity
@Table(name="sync_queue_item")
@ -49,6 +51,10 @@ public class SyncQueueItemVO implements SyncQueueItem{
@Column(name="queue_proc_number")
private Long lastProcessNumber;
@Column(name="queue_proc_time")
@Temporal(TemporalType.TIMESTAMP)
private Date lastProcessTime;
@Column(name="created")
private Date created;
@ -93,7 +99,7 @@ public class SyncQueueItemVO implements SyncQueueItem{
public void setLastProcessMsid(Long lastProcessMsid) {
this.lastProcessMsid = lastProcessMsid;
}
public Long getLastProcessNumber() {
return lastProcessNumber;
}
@ -109,7 +115,7 @@ public class SyncQueueItemVO implements SyncQueueItem{
public void setCreated(Date created) {
this.created = created;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
@ -117,8 +123,17 @@ public class SyncQueueItemVO implements SyncQueueItem{
sb.append(", contentId: ").append(getContentId());
sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
sb.append(", lastProcessTime: ").append(getLastProcessTime());
sb.append(", created: ").append(getCreated());
sb.append("}");
return sb.toString();
}
public Date getLastProcessTime() {
return lastProcessTime;
}
public void setLastProcessTime(Date lastProcessTime) {
this.lastProcessTime = lastProcessTime;
}
}

View File

@ -18,7 +18,6 @@
package com.cloud.async;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
@ -40,36 +39,31 @@ public class SyncQueueVO {
@Column(name="sync_objtype")
private String syncObjType;
@Column(name="sync_objid")
private Long syncObjId;
@Column(name="queue_proc_number")
private Long lastProcessNumber;
@Column(name="queue_proc_time")
@Temporal(TemporalType.TIMESTAMP)
private Date lastProcessTime;
@Column(name="queue_proc_msid")
private Long lastProcessMsid;
@Column(name="created")
@Temporal(TemporalType.TIMESTAMP)
private Date created;
@Column(name="last_updated")
@Temporal(TemporalType.TIMESTAMP)
private Date lastUpdated;
@Column(name="queue_size")
private long queueSize = 0;
@Column(name="queue_size_limit")
private long queueSizeLimit = 0;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getSyncObjType() {
return syncObjType;
}
@ -94,22 +88,6 @@ public class SyncQueueVO {
lastProcessNumber = number;
}
public Date getLastProcessTime() {
return lastProcessTime;
}
public void setLastProcessTime(Date lastProcessTime) {
this.lastProcessTime = lastProcessTime;
}
public Long getLastProcessMsid() {
return lastProcessMsid;
}
public void setLastProcessMsid(Long lastProcessMsid) {
this.lastProcessMsid = lastProcessMsid;
}
public Date getCreated() {
return created;
}
@ -125,18 +103,33 @@ public class SyncQueueVO {
public void setLastUpdated(Date lastUpdated) {
this.lastUpdated = lastUpdated;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("SyncQueueVO {id:").append(getId());
sb.append(", syncObjType: ").append(getSyncObjType());
sb.append(", syncObjId: ").append(getSyncObjId());
sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
sb.append(", lastProcessTime: ").append(getLastProcessTime());
sb.append(", lastUpdated: ").append(getLastUpdated());
sb.append(", created: ").append(getCreated());
sb.append(", count: ").append(getQueueSize());
sb.append("}");
return sb.toString();
}
public long getQueueSize() {
return queueSize;
}
public void setQueueSize(long queueSize) {
this.queueSize = queueSize;
}
public long getQueueSizeLimit() {
return queueSizeLimit;
}
public void setQueueSizeLimit(long queueSizeLimit) {
this.queueSizeLimit = queueSizeLimit;
}
}

View File

@ -33,6 +33,8 @@ import com.cloud.api.BaseCmd.CommandType;
import com.cloud.api.commands.ListEventsCmd;
import com.cloud.async.AsyncCommandQueued;
import com.cloud.async.AsyncJobManager;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AccountLimitException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.InvalidParameterValueException;
@ -44,6 +46,8 @@ import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.UserContext;
import com.cloud.utils.DateUtil;
import com.cloud.utils.IdentityProxy;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.PluggableService;
import com.cloud.utils.exception.CSExceptionErrorCode;
@ -59,6 +63,7 @@ public class ApiDispatcher {
ComponentLocator _locator;
AsyncJobManager _asyncMgr;
IdentityDao _identityDao;
long _createSnapshotQueueSizeLimit;
// singleton class
private static ApiDispatcher s_instance = new ApiDispatcher();
@ -71,6 +76,9 @@ public class ApiDispatcher {
_locator = ComponentLocator.getLocator(ManagementServer.Name);
_asyncMgr = _locator.getManager(AsyncJobManager.class);
_identityDao = _locator.getDao(IdentityDao.class);
ConfigurationDao configDao = _locator.getDao(ConfigurationDao.class);
Map<String, String> configs = configDao.getConfiguration();
_createSnapshotQueueSizeLimit = NumbersUtil.parseInt(configs.get(Config.ConcurrentSnapshotsThresholdPerHost.key()), 10);
}
public void dispatchCreateCmd(BaseAsyncCreateCmd cmd, Map<String, String> params) {
@ -130,8 +138,14 @@ public class ApiDispatcher {
ctx.setStartEventId(Long.valueOf(startEventId));
// Synchronise job on the object if needed
if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) {
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue());
long queueSizeLimit = 1;
if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) {
queueSizeLimit = _createSnapshotQueueSizeLimit;
}
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(),
asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
}
}

View File

@ -116,6 +116,7 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CSExceptionErrorCode;
import com.cloud.uuididentity.dao.IdentityDao;
public class ApiServer implements HttpRequestHandler {
private static final Logger s_logger = Logger.getLogger(ApiServer.class.getName());
private static final Logger s_accessLogger = Logger.getLogger("apiserver." + ApiServer.class.getName());
@ -299,7 +300,7 @@ public class ApiServer implements HttpRequestHandler {
if (apiPort != null) {
ListenerThread listenerThread = new ListenerThread(this, apiPort);
listenerThread.start();
}
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@ -515,14 +516,9 @@ public class ApiServer implements HttpRequestHandler {
ctx.setAccountId(asyncCmd.getEntityOwnerId());
AsyncJobVO job = new AsyncJobVO();
job.setInstanceId((objectId == null) ? asyncCmd.getInstanceId() : objectId);
job.setInstanceType(asyncCmd.getInstanceType());
job.setUserId(callerUserId);
job.setAccountId(caller.getId());
job.setCmd(cmdObj.getClass().getName());
job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params));
Long instanceId = (objectId == null) ? asyncCmd.getInstanceId() : objectId;
AsyncJobVO job = new AsyncJobVO(callerUserId, caller.getId(), cmdObj.getClass().getName(),
ApiGsonHelper.getBuilder().create().toJson(params), instanceId, asyncCmd.getInstanceType());
long jobId = _asyncMgr.submitAsyncJob(job);

View File

@ -25,8 +25,9 @@ public interface AsyncJobManager extends Manager {
public AsyncJobExecutorContext getExecutorContext();
public AsyncJobVO getAsyncJob(long jobId);
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
public AsyncJobVO getAsyncJob(long jobId);
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
public List<? extends AsyncJob> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId);
public long submitAsyncJob(AsyncJobVO job);
@ -39,7 +40,7 @@ public interface AsyncJobManager extends Manager {
public void releaseSyncSource(AsyncJobExecutor executor);
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId);
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
/**
* Queries for the status or final result of an async job.

View File

@ -91,29 +91,30 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
private AccountManager _accountMgr;
private AccountDao _accountDao;
private AsyncJobDao _jobDao;
private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour
private ApiDispatcher _dispatcher;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private ExecutorService _executor;
@Override
public AsyncJobExecutorContext getExecutorContext() {
return _context;
}
@Override
public AsyncJobVO getAsyncJob(long jobId) {
return _jobDao.findById(jobId);
}
@Override
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
private ApiDispatcher _dispatcher;
private final ScheduledExecutorService _heartbeatScheduler =
Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private ExecutorService _executor;
@Override
public AsyncJobExecutorContext getExecutorContext() {
return _context;
}
@Override
public AsyncJobVO getAsyncJob(long jobId) {
return _jobDao.findById(jobId);
}
@Override
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
}
@Override
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) {
return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
@ -213,47 +214,47 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
job.setProcessStatus(processStatus);
if(resultObject != null) {
job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject));
}
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
txt.rollback();
}
}
}
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
txt.rollback();
}
}
@Override @DB
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
", instanceId: " + instanceId);
}
Transaction txt = Transaction.currentTxn();
try {
txt.start();
@Override @DB
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
", instanceId: " + instanceId);
}
AsyncJobVO job = _jobDao.createForUpdate();
//job.setInstanceType(instanceType);
job.setInstanceId(instanceId);
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
Transaction txt = Transaction.currentTxn();
try {
txt.start();
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
txt.rollback();
}
}
AsyncJobVO job = _jobDao.createForUpdate();
//job.setInstanceType(instanceType);
job.setInstanceId(instanceId);
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId) {
// This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
// when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
// This method will get called every time their business logic executes. The first time it exectues for a job
// there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
// time the job executes we queue the job, otherwise we just return so that the business logic can execute.
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
txt.rollback();
}
}
@Override
public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
// This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
// when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
// This method will get called every time their business logic executes. The first time it exectues for a job
// there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
// time the job executes we queue the job, otherwise we just return so that the business logic can execute.
if (job.getSyncSource() != null) {
return;
}
@ -268,9 +269,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
// we retry five times until we throw an exception
Random random = new Random();
for(int i = 0; i < 5; i++) {
queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId());
if(queue != null) {
for(int i = 0; i < 5; i++) {
queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId(), queueSizeLimit);
if(queue != null) {
break;
}
@ -663,41 +664,41 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
if(s_logger.isInfoEnabled()) {
s_logger.info("Discard left-over queue item: " + item.toString());
}
String contentType = item.getContentType();
if(contentType != null && contentType.equals("AsyncJob")) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
}
}
_queueMgr.purgeItem(item.getId());
}
}
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
ComponentLocator locator = ComponentLocator.getCurrentLocator();
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
if (configDao == null) {
throw new ConfigurationException("Unable to get the configuration dao.");
}
int expireMinutes = NumbersUtil.parseInt(
configDao.getValue(Config.JobExpireMinutes.key()), 24*60);
_jobExpireSeconds = (long)expireMinutes*60;
_jobCancelThresholdSeconds = NumbersUtil.parseInt(
configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
_jobCancelThresholdSeconds *= 60;
String contentType = item.getContentType();
if(contentType != null && contentType.equals("AsyncJob")) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown"));
}
}
_queueMgr.purgeItem(item.getId());
}
}
}
_accountDao = locator.getDao(AccountDao.class);
if (_accountDao == null) {
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
ComponentLocator locator = ComponentLocator.getCurrentLocator();
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
if (configDao == null) {
throw new ConfigurationException("Unable to get the configuration dao.");
}
int expireMinutes = NumbersUtil.parseInt(
configDao.getValue(Config.JobExpireMinutes.key()), 24*60);
_jobExpireSeconds = (long)expireMinutes*60;
_jobCancelThresholdSeconds = NumbersUtil.parseInt(
configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60);
_jobCancelThresholdSeconds *= 60;
_accountDao = locator.getDao(AccountDao.class);
if (_accountDao == null) {
throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
}
_jobDao = locator.getDao(AsyncJobDao.class);
@ -748,23 +749,22 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
}
@Override
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
for(ManagementServerHostVO msHost : nodeList) {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
txn.start();
List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
cleanupPendingJobs(items);
_queueMgr.resetQueueProcess(msHost.getId());
_jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
txn.commit();
} catch(Throwable e) {
s_logger.warn("Unexpected exception ", e);
txn.rollback();
} finally {
txn.close();
}
}
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
for(ManagementServerHostVO msHost : nodeList) {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
txn.start();
List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
cleanupPendingJobs(items);
_jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
txn.commit();
} catch(Throwable e) {
s_logger.warn("Unexpected exception ", e);
txn.rollback();
} finally {
txn.close();
}
}
}
@Override
@ -773,21 +773,20 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
@Override
public boolean start() {
try {
List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
cleanupPendingJobs(l);
_queueMgr.resetQueueProcess(getMsid());
_jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
} catch(Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
GC_INTERVAL, TimeUnit.MILLISECONDS);
return true;
try {
List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
cleanupPendingJobs(l);
_jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart"));
} catch(Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL,
HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL,
GC_INTERVAL, TimeUnit.MILLISECONDS);
return true;
}
private static ExceptionResponse getResetResultResponse(String errorMessage) {

View File

@ -20,14 +20,14 @@ import java.util.List;
import com.cloud.utils.component.Manager;
public interface SyncQueueManager extends Manager {
public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId);
public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
public void purgeItem(long queueItemId);
public interface SyncQueueManager extends Manager {
public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
public void purgeItem(long queueItemId);
public void returnItem(long queueItemId);
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
public void resetQueueProcess(long msid);
}
}

View File

@ -34,165 +34,170 @@ import com.cloud.utils.db.DB;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
@Local(value={SyncQueueManager.class})
public class SyncQueueManagerImpl implements SyncQueueManager {
public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
private String _name;
private SyncQueueDao _syncQueueDao;
private SyncQueueItemDao _syncQueueItemDao;
@Local(value={SyncQueueManager.class})
public class SyncQueueManagerImpl implements SyncQueueManager {
public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
private String _name;
private SyncQueueDao _syncQueueDao;
private SyncQueueItemDao _syncQueueItemDao;
@Override
@DB
public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
Transaction txn = Transaction.currentTxn();
try {
txn.start();
_syncQueueDao.ensureQueue(syncObjType, syncObjId);
SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
if(queueVO == null)
throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
@Override
@DB
public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId) {
Transaction txn = Transaction.currentTxn();
try {
txn.start();
_syncQueueDao.ensureQueue(syncObjType, syncObjId);
SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
if(queueVO == null)
throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
Date dt = DateUtil.currentGMTTime();
SyncQueueItemVO item = new SyncQueueItemVO();
item.setQueueId(queueVO.getId());
item.setContentType(itemType);
item.setContentId(itemId);
item.setCreated(dt);
_syncQueueItemDao.persist(item);
txn.commit();
return queueVO;
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txn.rollback();
}
return null;
}
@Override
@DB
public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
if(queueVO == null) {
s_logger.error("Sync queue(id: " + queueId + ") does not exist");
txt.commit();
return null;
}
if(queueVO.getLastProcessTime() == null) {
SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
if(itemVO != null) {
Long processNumber = queueVO.getLastProcessNumber();
if(processNumber == null)
processNumber = new Long(1);
else
processNumber = processNumber + 1;
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessMsid(msid);
queueVO.setLastProcessNumber(processNumber);
queueVO.setLastProcessTime(dt);
queueVO.setQueueSizeLimit(queueSizeLimit);
_syncQueueDao.update(queueVO.getId(), queueVO);
Date dt = DateUtil.currentGMTTime();
SyncQueueItemVO item = new SyncQueueItemVO();
item.setQueueId(queueVO.getId());
item.setContentType(itemType);
item.setContentId(itemId);
item.setCreated(dt);
_syncQueueItemDao.persist(item);
txn.commit();
return queueVO;
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txn.rollback();
}
return null;
}
@Override
@DB
public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
if(queueVO == null) {
s_logger.error("Sync queue(id: " + queueId + ") does not exist");
txt.commit();
return null;
}
if(queueReadyToProcess(queueVO)) {
SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
if(itemVO != null) {
Long processNumber = queueVO.getLastProcessNumber();
if(processNumber == null)
processNumber = new Long(1);
else
processNumber = processNumber + 1;
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessNumber(processNumber);
queueVO.setLastUpdated(dt);
_syncQueueDao.update(queueVO.getId(), queueVO);
itemVO.setLastProcessMsid(msid);
queueVO.setQueueSize(queueVO.getQueueSize() + 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
_syncQueueItemDao.update(itemVO.getId(), itemVO);
txt.commit();
return itemVO;
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Sync queue (" + queueId + ") is currently empty");
}
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
return null;
}
@Override
@DB
public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
Transaction txt = Transaction.currentTxn();
try {
txt.start();
List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
if(queueVO.getLastProcessTime() == null && itemVO.getLastProcessNumber() == null) {
Long processNumber = queueVO.getLastProcessNumber();
if(processNumber == null)
processNumber = new Long(1);
else
processNumber = processNumber + 1;
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessMsid(msid);
itemVO.setLastProcessTime(dt);
_syncQueueItemDao.update(itemVO.getId(), itemVO);
txt.commit();
return itemVO;
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Sync queue (" + queueId + ") is currently empty");
}
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
return null;
}
@Override
@DB
public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
Transaction txt = Transaction.currentTxn();
try {
txt.start();
List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
Long processNumber = queueVO.getLastProcessNumber();
if(processNumber == null)
processNumber = new Long(1);
else
processNumber = processNumber + 1;
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessNumber(processNumber);
queueVO.setLastProcessTime(dt);
queueVO.setLastUpdated(dt);
_syncQueueDao.update(queueVO.getId(), queueVO);
itemVO.setLastProcessMsid(msid);
queueVO.setQueueSize(queueVO.getQueueSize() + 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
_syncQueueItemDao.update(item.getId(), itemVO);
resultList.add(item);
}
}
}
txt.commit();
return resultList;
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
return null;
}
@Override
@DB
public void purgeItem(long queueItemId) {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
_syncQueueItemDao.expunge(itemVO.getId());
queueVO.setLastProcessTime(null);
itemVO.setLastProcessTime(dt);
_syncQueueItemDao.update(item.getId(), itemVO);
resultList.add(item);
}
}
}
txt.commit();
return resultList;
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
return null;
}
@Override
@DB
public void purgeItem(long queueItemId) {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
_syncQueueItemDao.expunge(itemVO.getId());
queueVO.setLastUpdated(DateUtil.currentGMTTime());
_syncQueueDao.update(queueVO.getId(), queueVO);
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
//decrement the count
assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
}
txt.commit();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
}
@Override
@ -208,9 +213,9 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
itemVO.setLastProcessMsid(null);
itemVO.setLastProcessNumber(null);
itemVO.setLastProcessTime(null);
_syncQueueItemDao.update(queueItemId, itemVO);
queueVO.setLastProcessTime(null);
queueVO.setLastUpdated(DateUtil.currentGMTTime());
_syncQueueDao.update(queueVO.getId(), queueVO);
}
@ -231,44 +236,42 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
}
@Override
public void resetQueueProcess(long msid) {
_syncQueueDao.resetQueueProcessing(msid);
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_name = name;
ComponentLocator locator = ComponentLocator.getCurrentLocator();
_syncQueueDao = locator.getDao(SyncQueueDao.class);
if (_syncQueueDao == null) {
throw new ConfigurationException("Unable to get "
+ SyncQueueDao.class.getName());
_syncQueueDao = locator.getDao(SyncQueueDao.class);
if (_syncQueueDao == null) {
throw new ConfigurationException("Unable to get "
+ SyncQueueDao.class.getName());
}
_syncQueueItemDao = locator.getDao(SyncQueueItemDao.class);
if (_syncQueueItemDao == null) {
throw new ConfigurationException("Unable to get "
+ SyncQueueDao.class.getName());
}
_syncQueueItemDao = locator.getDao(SyncQueueItemDao.class);
if (_syncQueueItemDao == null) {
throw new ConfigurationException("Unable to get "
+ SyncQueueDao.class.getName());
}
return true;
return true;
}
@Override
public boolean start() {
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public String getName() {
return _name;
}
@Override
public boolean start() {
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public String getName() {
return _name;
}
}
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
}
}

View File

@ -22,5 +22,4 @@ import com.cloud.utils.db.GenericDao;
public interface SyncQueueDao extends GenericDao<SyncQueueVO, Long>{
public void ensureQueue(String syncObjType, long syncObjId);
public SyncQueueVO find(String syncObjType, long syncObjId);
public void resetQueueProcessing(long msid);
}
}

View File

@ -28,7 +28,6 @@ import org.apache.log4j.Logger;
import com.cloud.async.SyncQueueVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
@ -43,8 +42,9 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
@Override
public void ensureQueue(String syncObjType, long syncObjId) {
Date dt = DateUtil.currentGMTTime();
String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated) values(?, ?, ?, ?)";
String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" +
" values(?, ?, ?, ?)";
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
@ -60,7 +60,7 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
}
}
@Override
public SyncQueueVO find(String syncObjType, long syncObjId) {
SearchCriteria<SyncQueueVO> sc = TypeIdSearch.create();
@ -69,23 +69,6 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
return findOneBy(sc);
}
@Override @DB
public void resetQueueProcessing(long msid) {
String sql = "UPDATE sync_queue set queue_proc_msid=NULL, queue_proc_time=NULL where queue_proc_msid=?";
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
pstmt.setLong(1, msid);
pstmt.execute();
} catch (SQLException e) {
s_logger.warn("Unable to reset sync queue for management server " + msid, e);
} catch (Throwable e) {
s_logger.warn("Unable to reset sync queue for management server " + msid, e);
}
}
protected SyncQueueDaoImpl() {
super();
TypeIdSearch = createSearchBuilder();

View File

@ -30,11 +30,9 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.async.SyncQueueItemVO;
import com.cloud.async.SyncQueueVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.JoinBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
@ -42,8 +40,6 @@ import com.cloud.utils.db.Transaction;
@Local(value = { SyncQueueItemDao.class })
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
private final SyncQueueDao _syncQueueDao = new SyncQueueDaoImpl();
@Override
@ -71,7 +67,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
" FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
" WHERE q.queue_proc_time IS NULL AND i.queue_proc_number IS NULL " +
" WHERE q.queue_size < q.queue_size_limit AND i.queue_proc_number IS NULL " +
" GROUP BY q.id " +
" ORDER BY i.id " +
" LIMIT 0, ?";
@ -120,23 +116,18 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
@Override
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
Date cutTime = DateUtil.currentGMTTime();
cutTime = new Date(cutTime.getTime() - thresholdMs);
SearchBuilder<SyncQueueVO> sbQueue = _syncQueueDao.createSearchBuilder();
sbQueue.and("lastProcessTime", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
sbQueue.and("lastProcessTime2", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.LT);
SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
sbItem.join("queueItemJoinQueue", sbQueue, sbQueue.entity().getId(), sbItem.entity().getQueueId(), JoinBuilder.JoinType.INNER);
sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
sbQueue.done();
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
sbItem.done();
SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
sc.setJoinParameters("queueItemJoinQueue", "lastProcessTime2", cutTime);
sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
if(exclusive)
return lockRows(sc, null, true);
return listBy(sc, null);

View File

@ -139,8 +139,7 @@ public enum Config {
// Advanced
JobExpireMinutes("Advanced", ManagementServer.class, String.class, "job.expire.minutes", "1440", "Time (in minutes) for async-jobs to be kept in system", null),
JobCancelThresholdMinutes("Advanced", ManagementServer.class, String.class, "job.cancel.threshold.minutes", "60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", null),
SwiftEnable(
"Advanced", ManagementServer.class, Boolean.class, "swift.enable", "false", "enable swift ", null),
SwiftEnable("Advanced", ManagementServer.class, Boolean.class, "swift.enable", "false", "enable swift ", null),
EventPurgeInterval("Advanced", ManagementServer.class, Integer.class, "event.purge.interval", "86400", "The interval (in seconds) to wait before running the event purge thread", null),
AccountCleanupInterval("Advanced", ManagementServer.class, Integer.class, "account.cleanup.interval", "86400", "The interval (in seconds) between cleanup for removed accounts", null),
@ -349,7 +348,10 @@ public enum Config {
SecondaryStorageServiceOffering("Advanced", ManagementServer.class, Long.class, "secstorage.service.offering", null, "Service offering used by secondary storage; if NULL - system offering will be used", null),
HaTag("Advanced", ManagementServer.class, String.class, "ha.tag", null, "HA tag defining that the host marked with this tag can be used for HA purposes only", null),
VpcCleanupInterval("Advanced", ManagementServer.class, Integer.class, "vpc.cleanup.interval", "3600", "The interval (in seconds) between cleanup for Inactive VPCs", null),
VpcMaxNetworks("Advanced", ManagementServer.class, Integer.class, "vpc.max.networks", "3", "Maximum number of networks per vpc", null);
VpcMaxNetworks("Advanced", ManagementServer.class, Integer.class, "vpc.max.networks", "3", "Maximum number of networks per vpc", null),
ConcurrentSnapshotsThresholdPerHost("Advanced", ManagementServer.class, String.class, "concurrent.snapshots.threshold.perhost",
"10", "Limits number of snapshots that can be handled by the host concurrently", null);
private final String _category;

View File

@ -195,15 +195,13 @@ public class SnapshotManagerImpl implements SnapshotManager, SnapshotService, Ma
protected Answer sendToPool(Volume vol, Command cmd) {
StoragePool pool = _storagePoolDao.findById(vol.getPoolId());
VMInstanceVO vm = _vmDao.findById(vol.getInstanceId());
long[] hostIdsToTryFirst = null;
if (vm != null) {
if(vm.getHostId() != null) {
hostIdsToTryFirst = new long[] { vm.getHostId() };
} else if(vm.getLastHostId() != null) {
hostIdsToTryFirst = new long[] { vm.getLastHostId() };
}
Long vmHostId = getHostIdForSnapshotOperation(vol);
if (vmHostId != null) {
hostIdsToTryFirst = new long[] { vmHostId };
}
List<Long> hostIdsToAvoid = new ArrayList<Long>();
@ -235,6 +233,19 @@ public class SnapshotManagerImpl implements SnapshotManager, SnapshotService, Ma
return null;
}
@Override
public Long getHostIdForSnapshotOperation(Volume vol) {
VMInstanceVO vm = _vmDao.findById(vol.getInstanceId());
if (vm != null) {
if(vm.getHostId() != null) {
return vm.getHostId();
} else if(vm.getLastHostId() != null) {
return vm.getLastHostId();
}
}
return null;
}
@Override
public SnapshotVO createSnapshotOnPrimary(VolumeVO volume, Long policyId, Long snapshotId) {
SnapshotVO snapshot = _snapshotDao.findById(snapshotId);

View File

@ -28,6 +28,7 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.api.ApiConstants;
import com.cloud.api.ApiDispatcher;
import com.cloud.api.ApiGsonHelper;
import com.cloud.api.commands.CreateSnapshotCmd;
@ -36,6 +37,7 @@ import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobResult;
import com.cloud.async.AsyncJobVO;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.event.EventTypes;
import com.cloud.event.EventUtils;
@ -235,10 +237,10 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
EventTypes.EVENT_SNAPSHOT_CREATE, "creating snapshot for volume Id:"+volumeId,0);
Map<String, String> params = new HashMap<String, String>();
params.put("volumeid", ""+volumeId);
params.put("policyid", ""+policyId);
params.put(ApiConstants.VOLUME_ID, "" + volumeId);
params.put(ApiConstants.POLICY_ID, "" + policyId);
params.put("ctxUserId", "1");
params.put("ctxAccountId", "1");
params.put("ctxAccountId", "" + volume.getAccountId());
params.put("ctxStartEventId", String.valueOf(eventId));
CreateSnapshotCmd cmd = new CreateSnapshotCmd();
@ -246,15 +248,10 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
params.put("id", ""+cmd.getEntityId());
params.put("ctxStartEventId", "1");
AsyncJobVO job = new AsyncJobVO();
job.setUserId(userId);
// Just have SYSTEM own the job for now. Users won't be able to see this job, but
// it's an internal job so probably not a huge deal.
job.setAccountId(1L);
job.setCmd(CreateSnapshotCmd.class.getName());
job.setInstanceId(cmd.getEntityId());
job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params));
AsyncJobVO job = new AsyncJobVO(User.UID_SYSTEM, volume.getAccountId(), CreateSnapshotCmd.class.getName(),
ApiGsonHelper.getBuilder().create().toJson(params), cmd.getEntityId(),
cmd.getInstanceType());
long jobId = _asyncMgr.submitAsyncJob(job);
tmpSnapshotScheduleVO.setAsyncJobId(jobId);
@ -359,6 +356,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
_testTimerTask = new TestClock(this, minutesPerHour, hoursPerDay, daysPerWeek, daysPerMonth, weeksPerMonth, monthsPerYear);
}
_currentTimestamp = new Date();
s_logger.info("Snapshot Scheduler is configured.");
return true;

View File

@ -32,178 +32,178 @@ public class TestSyncQueueManager extends ComponentTestCase {
private volatile int count = 0;
private volatile long expectingCurrent = 1;
public void leftOverItems() {
SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
List<SyncQueueItemVO> l = mgr.getActiveQueueItems(1L, false);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
s_logger.info("Left over item: " + item.toString());
mgr.purgeItem(item.getId());
}
}
}
public void dequeueFromOneQueue() {
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
final int totalRuns = 5000;
final SyncQueueVO queue = mgr.queue("vm_instance", 1L, "Async-job", 1);
for(int i = 1; i < totalRuns; i++)
mgr.queue("vm_instance", 1L, "Async-job", i+1);
count = 0;
expectingCurrent = 1;
Thread thread1 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns) {
SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L);
if(item != null) {
s_logger.info("Thread 1 process item: " + item.toString());
Assert.assertEquals(expectingCurrent, item.getContentId().longValue());
expectingCurrent++;
count++;
mgr.purgeItem(item.getId());
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
Thread thread2 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns) {
SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L);
if(item != null) {
s_logger.info("Thread 2 process item: " + item.toString());
Assert.assertEquals(expectingCurrent, item.getContentId().longValue());
expectingCurrent++;
count++;
mgr.purgeItem(item.getId());
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
thread1.start();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
}
try {
thread2.join();
} catch (InterruptedException e) {
}
Assert.assertEquals(totalRuns, count);
}
public void dequeueFromAnyQueue() {
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
// simulate 30 queues
final int queues = 30;
final int totalRuns = 100;
final int itemsPerRun = 20;
for(int q = 1; q <= queues; q++)
for(int i = 0; i < totalRuns; i++)
mgr.queue("vm_instance", q, "Async-job", i+1);
count = 0;
Thread thread1 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns*queues) {
List<SyncQueueItemVO> l = mgr.dequeueFromAny(1L, itemsPerRun);
if(l != null && l.size() > 0) {
s_logger.info("Thread 1 get " + l.size() + " dequeued items");
for(SyncQueueItemVO item : l) {
s_logger.info("Thread 1 process item: " + item.toString());
count++;
mgr.purgeItem(item.getId());
}
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
Thread thread2 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns*queues) {
List<SyncQueueItemVO> l = mgr.dequeueFromAny(1L, itemsPerRun);
if(l != null && l.size() > 0) {
s_logger.info("Thread 2 get " + l.size() + " dequeued items");
for(SyncQueueItemVO item : l) {
s_logger.info("Thread 2 process item: " + item.toString());
count++;
mgr.purgeItem(item.getId());
}
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
thread1.start();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
}
try {
thread2.join();
} catch (InterruptedException e) {
}
Assert.assertEquals(queues*totalRuns, count);
}
public void testPopulateQueueData() {
final int queues = 30000;
final int totalRuns = 100;
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
for(int q = 1; q <= queues; q++)
for(int i = 0; i < totalRuns; i++)
mgr.queue("vm_instance", q, "Async-job", i+1);
public void leftOverItems() {
SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
List<SyncQueueItemVO> l = mgr.getActiveQueueItems(1L, false);
if(l != null && l.size() > 0) {
for(SyncQueueItemVO item : l) {
s_logger.info("Left over item: " + item.toString());
mgr.purgeItem(item.getId());
}
}
}
public void dequeueFromOneQueue() {
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
final int totalRuns = 5000;
final SyncQueueVO queue = mgr.queue("vm_instance", 1L, "Async-job", 1, 1);
for(int i = 1; i < totalRuns; i++)
mgr.queue("vm_instance", 1L, "Async-job", i+1, 1);
count = 0;
expectingCurrent = 1;
Thread thread1 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns) {
SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L);
if(item != null) {
s_logger.info("Thread 1 process item: " + item.toString());
Assert.assertEquals(expectingCurrent, item.getContentId().longValue());
expectingCurrent++;
count++;
mgr.purgeItem(item.getId());
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
Thread thread2 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns) {
SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L);
if(item != null) {
s_logger.info("Thread 2 process item: " + item.toString());
Assert.assertEquals(expectingCurrent, item.getContentId().longValue());
expectingCurrent++;
count++;
mgr.purgeItem(item.getId());
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
thread1.start();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
}
try {
thread2.join();
} catch (InterruptedException e) {
}
Assert.assertEquals(totalRuns, count);
}
public void dequeueFromAnyQueue() {
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
// simulate 30 queues
final int queues = 30;
final int totalRuns = 100;
final int itemsPerRun = 20;
for(int q = 1; q <= queues; q++)
for(int i = 0; i < totalRuns; i++)
mgr.queue("vm_instance", q, "Async-job", i+1, 1);
count = 0;
Thread thread1 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns*queues) {
List<SyncQueueItemVO> l = mgr.dequeueFromAny(1L, itemsPerRun);
if(l != null && l.size() > 0) {
s_logger.info("Thread 1 get " + l.size() + " dequeued items");
for(SyncQueueItemVO item : l) {
s_logger.info("Thread 1 process item: " + item.toString());
count++;
mgr.purgeItem(item.getId());
}
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
Thread thread2 = new Thread(new Runnable() {
public void run() {
while(count < totalRuns*queues) {
List<SyncQueueItemVO> l = mgr.dequeueFromAny(1L, itemsPerRun);
if(l != null && l.size() > 0) {
s_logger.info("Thread 2 get " + l.size() + " dequeued items");
for(SyncQueueItemVO item : l) {
s_logger.info("Thread 2 process item: " + item.toString());
count++;
mgr.purgeItem(item.getId());
}
}
try {
Thread.sleep(getRandomMilliseconds(1, 10));
} catch (InterruptedException e) {
}
}
}
}
);
thread1.start();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
}
try {
thread2.join();
} catch (InterruptedException e) {
}
Assert.assertEquals(queues*totalRuns, count);
}
public void testPopulateQueueData() {
final int queues = 30000;
final int totalRuns = 100;
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
for(int q = 1; q <= queues; q++)
for(int i = 0; i < totalRuns; i++)
mgr.queue("vm_instance", q, "Async-job", i+1, 1);
}
public void testSyncQueue() {
final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager(
SyncQueueManager.class);
mgr.queue("vm_instance", 1, "Async-job", 1);
mgr.queue("vm_instance", 1, "Async-job", 2);
mgr.queue("vm_instance", 1, "Async-job", 3);
mgr.queue("vm_instance", 1, "Async-job", 1, 1);
mgr.queue("vm_instance", 1, "Async-job", 2, 1);
mgr.queue("vm_instance", 1, "Async-job", 3, 1);
mgr.dequeueFromAny(100L, 1);
List<SyncQueueItemVO> l = mgr.getBlockedQueueItems(100000, false);

View File

@ -1362,16 +1362,15 @@ CREATE TABLE `cloud`.`sync_queue` (
`id` bigint unsigned NOT NULL auto_increment,
`sync_objtype` varchar(64) NOT NULL,
`sync_objid` bigint unsigned NOT NULL,
`queue_proc_msid` bigint,
`queue_proc_number` bigint COMMENT 'process number, increase 1 for each iteration',
`queue_proc_time` datetime COMMENT 'last time to process the queue',
`created` datetime COMMENT 'date created',
`last_updated` datetime COMMENT 'date created',
`queue_size` smallint DEFAULT 0 COMMENT 'number of items being processed by the queue',
`queue_size_limit` smallint DEFAULT 1 COMMENT 'max number of items the queue can process concurrently',
PRIMARY KEY (`id`),
UNIQUE `i_sync_queue__objtype__objid`(`sync_objtype`, `sync_objid`),
INDEX `i_sync_queue__created`(`created`),
INDEX `i_sync_queue__last_updated`(`last_updated`),
INDEX `i_sync_queue__queue_proc_time`(`queue_proc_time`)
INDEX `i_sync_queue__last_updated`(`last_updated`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cloud`.`stack_maid` (
@ -1392,13 +1391,15 @@ CREATE TABLE `cloud`.`sync_queue_item` (
`content_id` bigint,
`queue_proc_msid` bigint COMMENT 'owner msid when the queue item is being processed',
`queue_proc_number` bigint COMMENT 'used to distinguish raw items and items being in process',
`queue_proc_time` datetime COMMENT 'when processing started for the item',
`created` datetime COMMENT 'time created',
PRIMARY KEY (`id`),
CONSTRAINT `fk_sync_queue_item__queue_id` FOREIGN KEY `fk_sync_queue_item__queue_id` (`queue_id`) REFERENCES `sync_queue` (`id`) ON DELETE CASCADE,
INDEX `i_sync_queue_item__queue_id`(`queue_id`),
INDEX `i_sync_queue_item__created`(`created`),
INDEX `i_sync_queue_item__queue_proc_number`(`queue_proc_number`),
INDEX `i_sync_queue_item__queue_proc_msid`(`queue_proc_msid`)
INDEX `i_sync_queue_item__queue_proc_msid`(`queue_proc_msid`),
INDEX `i_sync_queue__queue_proc_time`(`queue_proc_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cloud`.`disk_offering` (
@ -2371,4 +2372,3 @@ CREATE TABLE `cloud`.`nicira_nvp_nic_map` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET foreign_key_checks = 1;