diff --git a/api/src/com/cloud/api/BaseAsyncCmd.java b/api/src/com/cloud/api/BaseAsyncCmd.java index 2960e4878e9..cf6d0bdbb93 100644 --- a/api/src/com/cloud/api/BaseAsyncCmd.java +++ b/api/src/com/cloud/api/BaseAsyncCmd.java @@ -25,10 +25,11 @@ import com.cloud.user.UserContext; * queryAsyncJobResult API command. */ public abstract class BaseAsyncCmd extends BaseCmd { + public static final String ipAddressSyncObject = "ipaddress"; public static final String networkSyncObject = "network"; public static final String vpcSyncObject = "vpc"; - + public static final String snapshotHostSyncObject = "snapshothost"; private AsyncJob job; diff --git a/api/src/com/cloud/api/commands/CreateSnapshotCmd.java b/api/src/com/cloud/api/commands/CreateSnapshotCmd.java index d1b6d7ad41a..e78dbdabe67 100755 --- a/api/src/com/cloud/api/commands/CreateSnapshotCmd.java +++ b/api/src/com/cloud/api/commands/CreateSnapshotCmd.java @@ -19,6 +19,7 @@ package com.cloud.api.commands; import org.apache.log4j.Logger; import com.cloud.api.ApiConstants; +import com.cloud.api.BaseAsyncCmd; import com.cloud.api.BaseAsyncCreateCmd; import com.cloud.api.BaseCmd; import com.cloud.api.IdentityMapper; @@ -60,6 +61,8 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd { @IdentityMapper(entityTableName="snapshot_policy") @Parameter(name = ApiConstants.POLICY_ID, type = CommandType.LONG, description = "policy id of the snapshot, if this is null, then use MANUAL_POLICY.") private Long policyId; + + private String syncObjectType = BaseAsyncCmd.snapshotHostSyncObject; // /////////////////////////////////////////////////// // ///////////////// Accessors /////////////////////// @@ -88,7 +91,16 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd { return Snapshot.MANUAL_POLICY_ID; } } + + private Long getHostId() { + Volume volume = _entityMgr.findById(Volume.class, getVolumeId()); + if (volume == null) { + throw new InvalidParameterValueException("Unable to find volume by id"); + } + return _snapshotService.getHostIdForSnapshotOperation(volume); + } + // /////////////////////////////////////////////////// // ///////////// API Implementation/////////////////// // /////////////////////////////////////////////////// @@ -161,4 +173,21 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd { throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + volumeId); } } + + + @Override + public String getSyncObjType() { + if (getSyncObjId() != null) { + return syncObjectType; + } + return null; + } + + @Override + public Long getSyncObjId() { + if (getHostId() != null) { + return getHostId(); + } + return null; + } } diff --git a/api/src/com/cloud/storage/snapshot/SnapshotService.java b/api/src/com/cloud/storage/snapshot/SnapshotService.java index 0500061670a..0c0e9b6c7e2 100644 --- a/api/src/com/cloud/storage/snapshot/SnapshotService.java +++ b/api/src/com/cloud/storage/snapshot/SnapshotService.java @@ -26,6 +26,7 @@ import com.cloud.api.commands.ListSnapshotsCmd; import com.cloud.exception.PermissionDeniedException; import com.cloud.exception.ResourceAllocationException; import com.cloud.storage.Snapshot; +import com.cloud.storage.Volume; import com.cloud.user.Account; public interface SnapshotService { @@ -97,4 +98,10 @@ public interface SnapshotService { * @return the Snapshot that was created */ Snapshot createSnapshot(Long volumeId, Long policyId, Long snapshotId, Account snapshotOwner); + + /** + * @param vol + * @return + */ + Long getHostIdForSnapshotOperation(Volume vol); } diff --git a/core/src/com/cloud/async/AsyncJobVO.java b/core/src/com/cloud/async/AsyncJobVO.java index fc2cf5bccdf..742631367a3 100644 --- a/core/src/com/cloud/async/AsyncJobVO.java +++ b/core/src/com/cloud/async/AsyncJobVO.java @@ -119,28 +119,31 @@ public class AsyncJobVO implements AsyncJob { @Transient private boolean fromPreviousSession = false; - public AsyncJobVO() { - this.uuid = UUID.randomUUID().toString(); - } - public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo) { - this.userId = userId; - this.accountId = accountId; - this.cmd = cmd; - this.cmdInfo = cmdInfo; + public AsyncJobVO() { + this.uuid = UUID.randomUUID().toString(); + } + + public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, Type instanceType) { + this.userId = userId; + this.accountId = accountId; + this.cmd = cmd; + this.cmdInfo = cmdInfo; this.callbackType = CALLBACK_POLLING; this.uuid = UUID.randomUUID().toString(); - } - - public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, - int callbackType, String callbackAddress) { - - this(userId, accountId, cmd, cmdInfo); - this.callbackType = callbackType; - this.callbackAddress = callbackAddress; + this.instanceId = instanceId; + } + + public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, + int callbackType, String callbackAddress, Long instanceId, Type instanceType) { + + this(userId, accountId, cmd, cmdInfo, instanceId, instanceType); + this.callbackType = callbackType; + this.callbackAddress = callbackAddress; this.uuid = UUID.randomUUID().toString(); } - + + @Override public Long getId() { return id; diff --git a/core/src/com/cloud/async/SyncQueueItemVO.java b/core/src/com/cloud/async/SyncQueueItemVO.java index aceb3181d70..c3ef2f66711 100644 --- a/core/src/com/cloud/async/SyncQueueItemVO.java +++ b/core/src/com/cloud/async/SyncQueueItemVO.java @@ -24,6 +24,8 @@ import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; @Entity @Table(name="sync_queue_item") @@ -49,6 +51,10 @@ public class SyncQueueItemVO implements SyncQueueItem{ @Column(name="queue_proc_number") private Long lastProcessNumber; + @Column(name="queue_proc_time") + @Temporal(TemporalType.TIMESTAMP) + private Date lastProcessTime; + @Column(name="created") private Date created; @@ -93,7 +99,7 @@ public class SyncQueueItemVO implements SyncQueueItem{ public void setLastProcessMsid(Long lastProcessMsid) { this.lastProcessMsid = lastProcessMsid; } - + public Long getLastProcessNumber() { return lastProcessNumber; } @@ -109,7 +115,7 @@ public class SyncQueueItemVO implements SyncQueueItem{ public void setCreated(Date created) { this.created = created; } - + public String toString() { StringBuffer sb = new StringBuffer(); sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId()); @@ -117,8 +123,17 @@ public class SyncQueueItemVO implements SyncQueueItem{ sb.append(", contentId: ").append(getContentId()); sb.append(", lastProcessMsid: ").append(getLastProcessMsid()); sb.append(", lastprocessNumber: ").append(getLastProcessNumber()); + sb.append(", lastProcessTime: ").append(getLastProcessTime()); sb.append(", created: ").append(getCreated()); sb.append("}"); return sb.toString(); } + + public Date getLastProcessTime() { + return lastProcessTime; + } + + public void setLastProcessTime(Date lastProcessTime) { + this.lastProcessTime = lastProcessTime; + } } diff --git a/core/src/com/cloud/async/SyncQueueVO.java b/core/src/com/cloud/async/SyncQueueVO.java index 419fecba6a4..f70b320c786 100644 --- a/core/src/com/cloud/async/SyncQueueVO.java +++ b/core/src/com/cloud/async/SyncQueueVO.java @@ -18,7 +18,6 @@ package com.cloud.async; import java.util.Date; - import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; @@ -40,36 +39,31 @@ public class SyncQueueVO { @Column(name="sync_objtype") private String syncObjType; - + @Column(name="sync_objid") private Long syncObjId; - + @Column(name="queue_proc_number") private Long lastProcessNumber; - - @Column(name="queue_proc_time") - @Temporal(TemporalType.TIMESTAMP) - private Date lastProcessTime; - - @Column(name="queue_proc_msid") - private Long lastProcessMsid; - + @Column(name="created") @Temporal(TemporalType.TIMESTAMP) private Date created; - + @Column(name="last_updated") @Temporal(TemporalType.TIMESTAMP) private Date lastUpdated; - + + @Column(name="queue_size") + private long queueSize = 0; + + @Column(name="queue_size_limit") + private long queueSizeLimit = 0; + public Long getId() { return id; } - public void setId(Long id) { - this.id = id; - } - public String getSyncObjType() { return syncObjType; } @@ -94,22 +88,6 @@ public class SyncQueueVO { lastProcessNumber = number; } - public Date getLastProcessTime() { - return lastProcessTime; - } - - public void setLastProcessTime(Date lastProcessTime) { - this.lastProcessTime = lastProcessTime; - } - - public Long getLastProcessMsid() { - return lastProcessMsid; - } - - public void setLastProcessMsid(Long lastProcessMsid) { - this.lastProcessMsid = lastProcessMsid; - } - public Date getCreated() { return created; } @@ -125,18 +103,33 @@ public class SyncQueueVO { public void setLastUpdated(Date lastUpdated) { this.lastUpdated = lastUpdated; } - + public String toString() { StringBuffer sb = new StringBuffer(); sb.append("SyncQueueVO {id:").append(getId()); sb.append(", syncObjType: ").append(getSyncObjType()); sb.append(", syncObjId: ").append(getSyncObjId()); - sb.append(", lastProcessMsid: ").append(getLastProcessMsid()); sb.append(", lastProcessNumber: ").append(getLastProcessNumber()); - sb.append(", lastProcessTime: ").append(getLastProcessTime()); sb.append(", lastUpdated: ").append(getLastUpdated()); sb.append(", created: ").append(getCreated()); + sb.append(", count: ").append(getQueueSize()); sb.append("}"); return sb.toString(); } + + public long getQueueSize() { + return queueSize; + } + + public void setQueueSize(long queueSize) { + this.queueSize = queueSize; + } + + public long getQueueSizeLimit() { + return queueSizeLimit; + } + + public void setQueueSizeLimit(long queueSizeLimit) { + this.queueSizeLimit = queueSizeLimit; + } } diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index 8eade00060a..9061dda2275 100755 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -33,6 +33,8 @@ import com.cloud.api.BaseCmd.CommandType; import com.cloud.api.commands.ListEventsCmd; import com.cloud.async.AsyncCommandQueued; import com.cloud.async.AsyncJobManager; +import com.cloud.configuration.Config; +import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.exception.AccountLimitException; import com.cloud.exception.InsufficientCapacityException; import com.cloud.exception.InvalidParameterValueException; @@ -44,6 +46,8 @@ import com.cloud.server.ManagementServer; import com.cloud.user.Account; import com.cloud.user.UserContext; import com.cloud.utils.DateUtil; +import com.cloud.utils.IdentityProxy; +import com.cloud.utils.NumbersUtil; import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.component.PluggableService; import com.cloud.utils.exception.CSExceptionErrorCode; @@ -59,6 +63,7 @@ public class ApiDispatcher { ComponentLocator _locator; AsyncJobManager _asyncMgr; IdentityDao _identityDao; + long _createSnapshotQueueSizeLimit; // singleton class private static ApiDispatcher s_instance = new ApiDispatcher(); @@ -71,6 +76,9 @@ public class ApiDispatcher { _locator = ComponentLocator.getLocator(ManagementServer.Name); _asyncMgr = _locator.getManager(AsyncJobManager.class); _identityDao = _locator.getDao(IdentityDao.class); + ConfigurationDao configDao = _locator.getDao(ConfigurationDao.class); + Map configs = configDao.getConfiguration(); + _createSnapshotQueueSizeLimit = NumbersUtil.parseInt(configs.get(Config.ConcurrentSnapshotsThresholdPerHost.key()), 10); } public void dispatchCreateCmd(BaseAsyncCreateCmd cmd, Map params) { @@ -130,8 +138,14 @@ public class ApiDispatcher { ctx.setStartEventId(Long.valueOf(startEventId)); // Synchronise job on the object if needed + if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) { - _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue()); + long queueSizeLimit = 1; + if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) { + queueSizeLimit = _createSnapshotQueueSizeLimit; + } + _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), + asyncCmd.getSyncObjId().longValue(), queueSizeLimit); } } diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index eb5e7705a5d..a5c9ea5b1ee 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -116,6 +116,7 @@ import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CSExceptionErrorCode; import com.cloud.uuididentity.dao.IdentityDao; + public class ApiServer implements HttpRequestHandler { private static final Logger s_logger = Logger.getLogger(ApiServer.class.getName()); private static final Logger s_accessLogger = Logger.getLogger("apiserver." + ApiServer.class.getName()); @@ -299,7 +300,7 @@ public class ApiServer implements HttpRequestHandler { if (apiPort != null) { ListenerThread listenerThread = new ListenerThread(this, apiPort); listenerThread.start(); - } + } } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -515,14 +516,9 @@ public class ApiServer implements HttpRequestHandler { ctx.setAccountId(asyncCmd.getEntityOwnerId()); - AsyncJobVO job = new AsyncJobVO(); - job.setInstanceId((objectId == null) ? asyncCmd.getInstanceId() : objectId); - job.setInstanceType(asyncCmd.getInstanceType()); - job.setUserId(callerUserId); - job.setAccountId(caller.getId()); - - job.setCmd(cmdObj.getClass().getName()); - job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params)); + Long instanceId = (objectId == null) ? asyncCmd.getInstanceId() : objectId; + AsyncJobVO job = new AsyncJobVO(callerUserId, caller.getId(), cmdObj.getClass().getName(), + ApiGsonHelper.getBuilder().create().toJson(params), instanceId, asyncCmd.getInstanceType()); long jobId = _asyncMgr.submitAsyncJob(job); diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java index 4664e1b07c0..482499714c6 100644 --- a/server/src/com/cloud/async/AsyncJobManager.java +++ b/server/src/com/cloud/async/AsyncJobManager.java @@ -25,8 +25,9 @@ public interface AsyncJobManager extends Manager { public AsyncJobExecutorContext getExecutorContext(); - public AsyncJobVO getAsyncJob(long jobId); - public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId); + public AsyncJobVO getAsyncJob(long jobId); + public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId); + public List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId); public long submitAsyncJob(AsyncJobVO job); @@ -39,7 +40,7 @@ public interface AsyncJobManager extends Manager { public void releaseSyncSource(AsyncJobExecutor executor); - public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId); + public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); /** * Queries for the status or final result of an async job. diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 585ba39b2da..6cf95feb977 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -91,29 +91,30 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe private AccountManager _accountMgr; private AccountDao _accountDao; private AsyncJobDao _jobDao; - private long _jobExpireSeconds = 86400; // 1 day - private long _jobCancelThresholdSeconds = 3600; // 1 hour - private ApiDispatcher _dispatcher; - - private final ScheduledExecutorService _heartbeatScheduler = - Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); - private ExecutorService _executor; - - @Override - public AsyncJobExecutorContext getExecutorContext() { - return _context; - } - - @Override - public AsyncJobVO getAsyncJob(long jobId) { - return _jobDao.findById(jobId); - } - - @Override - public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { - return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId); + private long _jobExpireSeconds = 86400; // 1 day + private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) + + private ApiDispatcher _dispatcher; + + private final ScheduledExecutorService _heartbeatScheduler = + Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); + private ExecutorService _executor; + + @Override + public AsyncJobExecutorContext getExecutorContext() { + return _context; } + @Override + public AsyncJobVO getAsyncJob(long jobId) { + return _jobDao.findById(jobId); + } + + @Override + public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { + return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId); + } + @Override public List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) { return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId); @@ -213,47 +214,47 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe job.setProcessStatus(processStatus); if(resultObject != null) { job.setResult(ApiSerializerHelper.toSerializedStringOld(resultObject)); - } - job.setLastUpdated(DateUtil.currentGMTTime()); - _jobDao.update(jobId, job); - txt.commit(); - } catch(Exception e) { - s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e); - txt.rollback(); - } - } + } + job.setLastUpdated(DateUtil.currentGMTTime()); + _jobDao.update(jobId, job); + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e); + txt.rollback(); + } + } - @Override @DB - public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + - ", instanceId: " + instanceId); - } - - Transaction txt = Transaction.currentTxn(); - try { - txt.start(); + @Override @DB + public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + + ", instanceId: " + instanceId); + } - AsyncJobVO job = _jobDao.createForUpdate(); - //job.setInstanceType(instanceType); - job.setInstanceId(instanceId); - job.setLastUpdated(DateUtil.currentGMTTime()); - _jobDao.update(jobId, job); + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); - txt.commit(); - } catch(Exception e) { - s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e); - txt.rollback(); - } - } + AsyncJobVO job = _jobDao.createForUpdate(); + //job.setInstanceType(instanceType); + job.setInstanceId(instanceId); + job.setLastUpdated(DateUtil.currentGMTTime()); + _jobDao.update(jobId, job); - @Override - public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId) { - // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router, - // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this). - // This method will get called every time their business logic executes. The first time it exectues for a job - // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first - // time the job executes we queue the job, otherwise we just return so that the business logic can execute. + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e); + txt.rollback(); + } + } + + @Override + public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) { + // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router, + // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this). + // This method will get called every time their business logic executes. The first time it exectues for a job + // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first + // time the job executes we queue the job, otherwise we just return so that the business logic can execute. if (job.getSyncSource() != null) { return; } @@ -268,9 +269,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe // we retry five times until we throw an exception Random random = new Random(); - for(int i = 0; i < 5; i++) { - queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId()); - if(queue != null) { + for(int i = 0; i < 5; i++) { + queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId(), queueSizeLimit); + if(queue != null) { break; } @@ -663,41 +664,41 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe if(s_logger.isInfoEnabled()) { s_logger.info("Discard left-over queue item: " + item.toString()); } - - String contentType = item.getContentType(); - if(contentType != null && contentType.equals("AsyncJob")) { - Long jobId = item.getContentId(); - if(jobId != null) { - s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); - completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown")); - } - } - _queueMgr.purgeItem(item.getId()); - } - } - } - - @Override - public boolean configure(String name, Map params) throws ConfigurationException { - _name = name; - - ComponentLocator locator = ComponentLocator.getCurrentLocator(); - - ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); - if (configDao == null) { - throw new ConfigurationException("Unable to get the configuration dao."); - } - int expireMinutes = NumbersUtil.parseInt( - configDao.getValue(Config.JobExpireMinutes.key()), 24*60); - _jobExpireSeconds = (long)expireMinutes*60; - - _jobCancelThresholdSeconds = NumbersUtil.parseInt( - configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60); - _jobCancelThresholdSeconds *= 60; + String contentType = item.getContentType(); + if(contentType != null && contentType.equals("AsyncJob")) { + Long jobId = item.getContentId(); + if(jobId != null) { + s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId); + completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Execution was cancelled because of server shutdown")); + } + } + _queueMgr.purgeItem(item.getId()); + } + } + } - _accountDao = locator.getDao(AccountDao.class); - if (_accountDao == null) { + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + _name = name; + + ComponentLocator locator = ComponentLocator.getCurrentLocator(); + + ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); + if (configDao == null) { + throw new ConfigurationException("Unable to get the configuration dao."); + } + + int expireMinutes = NumbersUtil.parseInt( + configDao.getValue(Config.JobExpireMinutes.key()), 24*60); + _jobExpireSeconds = (long)expireMinutes*60; + + _jobCancelThresholdSeconds = NumbersUtil.parseInt( + configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60); + _jobCancelThresholdSeconds *= 60; + + _accountDao = locator.getDao(AccountDao.class); + if (_accountDao == null) { throw new ConfigurationException("Unable to get " + AccountDao.class.getName()); } _jobDao = locator.getDao(AsyncJobDao.class); @@ -748,23 +749,22 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe } @Override - public void onManagementNodeLeft(List nodeList, long selfNodeId) { - for(ManagementServerHostVO msHost : nodeList) { - Transaction txn = Transaction.open(Transaction.CLOUD_DB); - try { - txn.start(); - List items = _queueMgr.getActiveQueueItems(msHost.getId(), true); - cleanupPendingJobs(items); - _queueMgr.resetQueueProcess(msHost.getId()); - _jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart")); - txn.commit(); - } catch(Throwable e) { - s_logger.warn("Unexpected exception ", e); - txn.rollback(); - } finally { - txn.close(); - } - } + public void onManagementNodeLeft(List nodeList, long selfNodeId) { + for(ManagementServerHostVO msHost : nodeList) { + Transaction txn = Transaction.open(Transaction.CLOUD_DB); + try { + txn.start(); + List items = _queueMgr.getActiveQueueItems(msHost.getId(), true); + cleanupPendingJobs(items); + _jobDao.resetJobProcess(msHost.getId(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart")); + txn.commit(); + } catch(Throwable e) { + s_logger.warn("Unexpected exception ", e); + txn.rollback(); + } finally { + txn.close(); + } + } } @Override @@ -773,21 +773,20 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe @Override public boolean start() { - try { - List l = _queueMgr.getActiveQueueItems(getMsid(), false); - cleanupPendingJobs(l); - _queueMgr.resetQueueProcess(getMsid()); - _jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart")); - } catch(Throwable e) { - s_logger.error("Unexpected exception " + e.getMessage(), e); - } - - _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL, - HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); - _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, - GC_INTERVAL, TimeUnit.MILLISECONDS); - - return true; + try { + List l = _queueMgr.getActiveQueueItems(getMsid(), false); + cleanupPendingJobs(l); + _jobDao.resetJobProcess(getMsid(), BaseCmd.INTERNAL_ERROR, getSerializedErrorMessage("job cancelled because of management server restart")); + } catch(Throwable e) { + s_logger.error("Unexpected exception " + e.getMessage(), e); + } + + _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL, + HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); + _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, + GC_INTERVAL, TimeUnit.MILLISECONDS); + + return true; } private static ExceptionResponse getResetResultResponse(String errorMessage) { diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java index 1dd82530c2a..b605f1b8670 100644 --- a/server/src/com/cloud/async/SyncQueueManager.java +++ b/server/src/com/cloud/async/SyncQueueManager.java @@ -20,14 +20,14 @@ import java.util.List; import com.cloud.utils.component.Manager; -public interface SyncQueueManager extends Manager { - public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId); - public SyncQueueItemVO dequeueFromOne(long queueId, Long msid); - public List dequeueFromAny(Long msid, int maxItems); - public void purgeItem(long queueItemId); + +public interface SyncQueueManager extends Manager { + public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit); + public SyncQueueItemVO dequeueFromOne(long queueId, Long msid); + public List dequeueFromAny(Long msid, int maxItems); + public void purgeItem(long queueItemId); public void returnItem(long queueItemId); public List getActiveQueueItems(Long msid, boolean exclusive); public List getBlockedQueueItems(long thresholdMs, boolean exclusive); - public void resetQueueProcess(long msid); -} +} diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java index b5f3c4fd92e..c3f49557b00 100644 --- a/server/src/com/cloud/async/SyncQueueManagerImpl.java +++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java @@ -34,165 +34,170 @@ import com.cloud.utils.db.DB; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; -@Local(value={SyncQueueManager.class}) -public class SyncQueueManagerImpl implements SyncQueueManager { - public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName()); - - private String _name; - - private SyncQueueDao _syncQueueDao; - private SyncQueueItemDao _syncQueueItemDao; + +@Local(value={SyncQueueManager.class}) +public class SyncQueueManagerImpl implements SyncQueueManager { + public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName()); + + private String _name; + + private SyncQueueDao _syncQueueDao; + private SyncQueueItemDao _syncQueueItemDao; + + @Override + @DB + public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) { + Transaction txn = Transaction.currentTxn(); + try { + txn.start(); + + _syncQueueDao.ensureQueue(syncObjType, syncObjId); + SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId); + if(queueVO == null) + throw new CloudRuntimeException("Unable to queue item into DB, DB is full?"); - @Override - @DB - public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId) { - Transaction txn = Transaction.currentTxn(); - try { - txn.start(); - - _syncQueueDao.ensureQueue(syncObjType, syncObjId); - SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId); - if(queueVO == null) - throw new CloudRuntimeException("Unable to queue item into DB, DB is full?"); - - - Date dt = DateUtil.currentGMTTime(); - SyncQueueItemVO item = new SyncQueueItemVO(); - item.setQueueId(queueVO.getId()); - item.setContentType(itemType); - item.setContentId(itemId); - item.setCreated(dt); - - _syncQueueItemDao.persist(item); - txn.commit(); - - return queueVO; - } catch(Exception e) { - s_logger.error("Unexpected exception: ", e); - txn.rollback(); - } - return null; - } - - @Override - @DB - public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) { - Transaction txt = Transaction.currentTxn(); - try { - txt.start(); - - SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true); - if(queueVO == null) { - s_logger.error("Sync queue(id: " + queueId + ") does not exist"); - txt.commit(); - return null; - } - - if(queueVO.getLastProcessTime() == null) { - SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId()); - if(itemVO != null) { - Long processNumber = queueVO.getLastProcessNumber(); - if(processNumber == null) - processNumber = new Long(1); - else - processNumber = processNumber + 1; - Date dt = DateUtil.currentGMTTime(); - queueVO.setLastProcessMsid(msid); - queueVO.setLastProcessNumber(processNumber); - queueVO.setLastProcessTime(dt); + queueVO.setQueueSizeLimit(queueSizeLimit); + _syncQueueDao.update(queueVO.getId(), queueVO); + + Date dt = DateUtil.currentGMTTime(); + SyncQueueItemVO item = new SyncQueueItemVO(); + item.setQueueId(queueVO.getId()); + item.setContentType(itemType); + item.setContentId(itemId); + item.setCreated(dt); + + _syncQueueItemDao.persist(item); + txn.commit(); + + return queueVO; + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txn.rollback(); + } + return null; + } + + @Override + @DB + public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) { + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true); + if(queueVO == null) { + s_logger.error("Sync queue(id: " + queueId + ") does not exist"); + txt.commit(); + return null; + } + + if(queueReadyToProcess(queueVO)) { + SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId()); + if(itemVO != null) { + Long processNumber = queueVO.getLastProcessNumber(); + if(processNumber == null) + processNumber = new Long(1); + else + processNumber = processNumber + 1; + Date dt = DateUtil.currentGMTTime(); + queueVO.setLastProcessNumber(processNumber); queueVO.setLastUpdated(dt); - _syncQueueDao.update(queueVO.getId(), queueVO); - - itemVO.setLastProcessMsid(msid); + queueVO.setQueueSize(queueVO.getQueueSize() + 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + + itemVO.setLastProcessMsid(msid); itemVO.setLastProcessNumber(processNumber); - _syncQueueItemDao.update(itemVO.getId(), itemVO); - - txt.commit(); - return itemVO; - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Sync queue (" + queueId + ") is currently empty"); - } - } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")"); - } - txt.commit(); - } catch(Exception e) { - s_logger.error("Unexpected exception: ", e); - txt.rollback(); - } - - return null; - } - - @Override - @DB - public List dequeueFromAny(Long msid, int maxItems) { - - List resultList = new ArrayList(); - Transaction txt = Transaction.currentTxn(); - try { - txt.start(); - - List l = _syncQueueItemDao.getNextQueueItems(maxItems); - if(l != null && l.size() > 0) { - for(SyncQueueItemVO item : l) { - SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true); - SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true); - if(queueVO.getLastProcessTime() == null && itemVO.getLastProcessNumber() == null) { - Long processNumber = queueVO.getLastProcessNumber(); - if(processNumber == null) - processNumber = new Long(1); - else - processNumber = processNumber + 1; - - Date dt = DateUtil.currentGMTTime(); - queueVO.setLastProcessMsid(msid); + itemVO.setLastProcessTime(dt); + _syncQueueItemDao.update(itemVO.getId(), itemVO); + + txt.commit(); + return itemVO; + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Sync queue (" + queueId + ") is currently empty"); + } + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")"); + } + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } + + return null; + } + + @Override + @DB + public List dequeueFromAny(Long msid, int maxItems) { + + List resultList = new ArrayList(); + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + List l = _syncQueueItemDao.getNextQueueItems(maxItems); + if(l != null && l.size() > 0) { + for(SyncQueueItemVO item : l) { + SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true); + SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true); + if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) { + Long processNumber = queueVO.getLastProcessNumber(); + if(processNumber == null) + processNumber = new Long(1); + else + processNumber = processNumber + 1; + + Date dt = DateUtil.currentGMTTime(); queueVO.setLastProcessNumber(processNumber); - queueVO.setLastProcessTime(dt); queueVO.setLastUpdated(dt); - _syncQueueDao.update(queueVO.getId(), queueVO); - - itemVO.setLastProcessMsid(msid); + queueVO.setQueueSize(queueVO.getQueueSize() + 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + + itemVO.setLastProcessMsid(msid); itemVO.setLastProcessNumber(processNumber); - _syncQueueItemDao.update(item.getId(), itemVO); - - resultList.add(item); - } - } - } - txt.commit(); - return resultList; - } catch(Exception e) { - s_logger.error("Unexpected exception: ", e); - txt.rollback(); - } - return null; - } - - @Override - @DB - public void purgeItem(long queueItemId) { - Transaction txt = Transaction.currentTxn(); - try { - txt.start(); - - SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); - if(itemVO != null) { - SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); - - _syncQueueItemDao.expunge(itemVO.getId()); - - queueVO.setLastProcessTime(null); + itemVO.setLastProcessTime(dt); + _syncQueueItemDao.update(item.getId(), itemVO); + + resultList.add(item); + } + } + } + txt.commit(); + return resultList; + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } + return null; + } + + @Override + @DB + public void purgeItem(long queueItemId) { + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); + if(itemVO != null) { + SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); + + _syncQueueItemDao.expunge(itemVO.getId()); + queueVO.setLastUpdated(DateUtil.currentGMTTime()); - _syncQueueDao.update(queueVO.getId(), queueVO); - } - txt.commit(); - } catch(Exception e) { - s_logger.error("Unexpected exception: ", e); - txt.rollback(); - } + //decrement the count + assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!"; + queueVO.setQueueSize(queueVO.getQueueSize() - 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + } + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } } @Override @@ -208,9 +213,9 @@ public class SyncQueueManagerImpl implements SyncQueueManager { itemVO.setLastProcessMsid(null); itemVO.setLastProcessNumber(null); + itemVO.setLastProcessTime(null); _syncQueueItemDao.update(queueItemId, itemVO); - queueVO.setLastProcessTime(null); queueVO.setLastUpdated(DateUtil.currentGMTTime()); _syncQueueDao.update(queueVO.getId(), queueVO); } @@ -231,44 +236,42 @@ public class SyncQueueManagerImpl implements SyncQueueManager { return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive); } - @Override - public void resetQueueProcess(long msid) { - _syncQueueDao.resetQueueProcessing(msid); - } - - @Override - public boolean configure(String name, Map params) throws ConfigurationException { - _name = name; + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + _name = name; ComponentLocator locator = ComponentLocator.getCurrentLocator(); - - _syncQueueDao = locator.getDao(SyncQueueDao.class); - if (_syncQueueDao == null) { - throw new ConfigurationException("Unable to get " - + SyncQueueDao.class.getName()); + + _syncQueueDao = locator.getDao(SyncQueueDao.class); + if (_syncQueueDao == null) { + throw new ConfigurationException("Unable to get " + + SyncQueueDao.class.getName()); + } + + _syncQueueItemDao = locator.getDao(SyncQueueItemDao.class); + if (_syncQueueItemDao == null) { + throw new ConfigurationException("Unable to get " + + SyncQueueDao.class.getName()); } - - _syncQueueItemDao = locator.getDao(SyncQueueItemDao.class); - if (_syncQueueItemDao == null) { - throw new ConfigurationException("Unable to get " - + SyncQueueDao.class.getName()); - } - - return true; + + return true; + } + + @Override + public boolean start() { + return true; + } + + @Override + public boolean stop() { + return true; + } + + @Override + public String getName() { + return _name; } - @Override - public boolean start() { - return true; - } - - @Override - public boolean stop() { - return true; - } - - @Override - public String getName() { - return _name; - } -} - + private boolean queueReadyToProcess(SyncQueueVO queueVO) { + return queueVO.getQueueSize() < queueVO.getQueueSizeLimit(); + } +} \ No newline at end of file diff --git a/server/src/com/cloud/async/dao/SyncQueueDao.java b/server/src/com/cloud/async/dao/SyncQueueDao.java index 1b04973d6f9..816f9dd1c30 100644 --- a/server/src/com/cloud/async/dao/SyncQueueDao.java +++ b/server/src/com/cloud/async/dao/SyncQueueDao.java @@ -22,5 +22,4 @@ import com.cloud.utils.db.GenericDao; public interface SyncQueueDao extends GenericDao{ public void ensureQueue(String syncObjType, long syncObjId); public SyncQueueVO find(String syncObjType, long syncObjId); - public void resetQueueProcessing(long msid); -} +} diff --git a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java index a40edb26df7..bfe8c0fe451 100644 --- a/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java +++ b/server/src/com/cloud/async/dao/SyncQueueDaoImpl.java @@ -28,7 +28,6 @@ import org.apache.log4j.Logger; import com.cloud.async.SyncQueueVO; import com.cloud.utils.DateUtil; -import com.cloud.utils.db.DB; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; @@ -43,8 +42,9 @@ public class SyncQueueDaoImpl extends GenericDaoBase implemen @Override public void ensureQueue(String syncObjType, long syncObjId) { Date dt = DateUtil.currentGMTTime(); - String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated) values(?, ?, ?, ?)"; - + String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" + + " values(?, ?, ?, ?)"; + Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; try { @@ -60,7 +60,7 @@ public class SyncQueueDaoImpl extends GenericDaoBase implemen s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e); } } - + @Override public SyncQueueVO find(String syncObjType, long syncObjId) { SearchCriteria sc = TypeIdSearch.create(); @@ -69,23 +69,6 @@ public class SyncQueueDaoImpl extends GenericDaoBase implemen return findOneBy(sc); } - @Override @DB - public void resetQueueProcessing(long msid) { - String sql = "UPDATE sync_queue set queue_proc_msid=NULL, queue_proc_time=NULL where queue_proc_msid=?"; - - Transaction txn = Transaction.currentTxn(); - PreparedStatement pstmt = null; - try { - pstmt = txn.prepareAutoCloseStatement(sql); - pstmt.setLong(1, msid); - pstmt.execute(); - } catch (SQLException e) { - s_logger.warn("Unable to reset sync queue for management server " + msid, e); - } catch (Throwable e) { - s_logger.warn("Unable to reset sync queue for management server " + msid, e); - } - } - protected SyncQueueDaoImpl() { super(); TypeIdSearch = createSearchBuilder(); diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java index e6a4e53875c..ce212981d50 100644 --- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java +++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java @@ -30,11 +30,9 @@ import javax.ejb.Local; import org.apache.log4j.Logger; import com.cloud.async.SyncQueueItemVO; -import com.cloud.async.SyncQueueVO; import com.cloud.utils.DateUtil; import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; -import com.cloud.utils.db.JoinBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; @@ -42,8 +40,6 @@ import com.cloud.utils.db.Transaction; @Local(value = { SyncQueueItemDao.class }) public class SyncQueueItemDaoImpl extends GenericDaoBase implements SyncQueueItemDao { private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); - - private final SyncQueueDao _syncQueueDao = new SyncQueueDaoImpl(); @Override @@ -71,7 +67,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " + - " WHERE q.queue_proc_time IS NULL AND i.queue_proc_number IS NULL " + + " WHERE q.queue_size < q.queue_size_limit AND i.queue_proc_number IS NULL " + " GROUP BY q.id " + " ORDER BY i.id " + " LIMIT 0, ?"; @@ -120,23 +116,18 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase @Override public List getBlockedQueueItems(long thresholdMs, boolean exclusive) { Date cutTime = DateUtil.currentGMTTime(); - cutTime = new Date(cutTime.getTime() - thresholdMs); - - SearchBuilder sbQueue = _syncQueueDao.createSearchBuilder(); - sbQueue.and("lastProcessTime", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); - sbQueue.and("lastProcessTime2", sbQueue.entity().getLastProcessTime(), SearchCriteria.Op.LT); SearchBuilder sbItem = createSearchBuilder(); - sbItem.join("queueItemJoinQueue", sbQueue, sbQueue.entity().getId(), sbItem.entity().getQueueId(), JoinBuilder.JoinType.INNER); sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); - - sbQueue.done(); + sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT); + sbItem.done(); - + SearchCriteria sc = sbItem.create(); - sc.setJoinParameters("queueItemJoinQueue", "lastProcessTime2", cutTime); - + sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs)); + if(exclusive) return lockRows(sc, null, true); return listBy(sc, null); diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index dbcc97a3739..c214bd02478 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -139,8 +139,7 @@ public enum Config { // Advanced JobExpireMinutes("Advanced", ManagementServer.class, String.class, "job.expire.minutes", "1440", "Time (in minutes) for async-jobs to be kept in system", null), JobCancelThresholdMinutes("Advanced", ManagementServer.class, String.class, "job.cancel.threshold.minutes", "60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", null), - SwiftEnable( - "Advanced", ManagementServer.class, Boolean.class, "swift.enable", "false", "enable swift ", null), + SwiftEnable("Advanced", ManagementServer.class, Boolean.class, "swift.enable", "false", "enable swift ", null), EventPurgeInterval("Advanced", ManagementServer.class, Integer.class, "event.purge.interval", "86400", "The interval (in seconds) to wait before running the event purge thread", null), AccountCleanupInterval("Advanced", ManagementServer.class, Integer.class, "account.cleanup.interval", "86400", "The interval (in seconds) between cleanup for removed accounts", null), @@ -349,7 +348,10 @@ public enum Config { SecondaryStorageServiceOffering("Advanced", ManagementServer.class, Long.class, "secstorage.service.offering", null, "Service offering used by secondary storage; if NULL - system offering will be used", null), HaTag("Advanced", ManagementServer.class, String.class, "ha.tag", null, "HA tag defining that the host marked with this tag can be used for HA purposes only", null), VpcCleanupInterval("Advanced", ManagementServer.class, Integer.class, "vpc.cleanup.interval", "3600", "The interval (in seconds) between cleanup for Inactive VPCs", null), - VpcMaxNetworks("Advanced", ManagementServer.class, Integer.class, "vpc.max.networks", "3", "Maximum number of networks per vpc", null); + VpcMaxNetworks("Advanced", ManagementServer.class, Integer.class, "vpc.max.networks", "3", "Maximum number of networks per vpc", null), + + ConcurrentSnapshotsThresholdPerHost("Advanced", ManagementServer.class, String.class, "concurrent.snapshots.threshold.perhost", + "10", "Limits number of snapshots that can be handled by the host concurrently", null); private final String _category; diff --git a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java index d89a6d97678..2410d085684 100755 --- a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java +++ b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java @@ -195,15 +195,13 @@ public class SnapshotManagerImpl implements SnapshotManager, SnapshotService, Ma protected Answer sendToPool(Volume vol, Command cmd) { StoragePool pool = _storagePoolDao.findById(vol.getPoolId()); - VMInstanceVO vm = _vmDao.findById(vol.getInstanceId()); - + long[] hostIdsToTryFirst = null; - if (vm != null) { - if(vm.getHostId() != null) { - hostIdsToTryFirst = new long[] { vm.getHostId() }; - } else if(vm.getLastHostId() != null) { - hostIdsToTryFirst = new long[] { vm.getLastHostId() }; - } + + Long vmHostId = getHostIdForSnapshotOperation(vol); + + if (vmHostId != null) { + hostIdsToTryFirst = new long[] { vmHostId }; } List hostIdsToAvoid = new ArrayList(); @@ -235,6 +233,19 @@ public class SnapshotManagerImpl implements SnapshotManager, SnapshotService, Ma return null; } + @Override + public Long getHostIdForSnapshotOperation(Volume vol) { + VMInstanceVO vm = _vmDao.findById(vol.getInstanceId()); + if (vm != null) { + if(vm.getHostId() != null) { + return vm.getHostId(); + } else if(vm.getLastHostId() != null) { + return vm.getLastHostId(); + } + } + return null; + } + @Override public SnapshotVO createSnapshotOnPrimary(VolumeVO volume, Long policyId, Long snapshotId) { SnapshotVO snapshot = _snapshotDao.findById(snapshotId); diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java index 8c099940b52..80c6a236a42 100644 --- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java +++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java @@ -28,6 +28,7 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; +import com.cloud.api.ApiConstants; import com.cloud.api.ApiDispatcher; import com.cloud.api.ApiGsonHelper; import com.cloud.api.commands.CreateSnapshotCmd; @@ -36,6 +37,7 @@ import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobResult; import com.cloud.async.AsyncJobVO; import com.cloud.async.dao.AsyncJobDao; +import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.event.EventTypes; import com.cloud.event.EventUtils; @@ -235,10 +237,10 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { EventTypes.EVENT_SNAPSHOT_CREATE, "creating snapshot for volume Id:"+volumeId,0); Map params = new HashMap(); - params.put("volumeid", ""+volumeId); - params.put("policyid", ""+policyId); + params.put(ApiConstants.VOLUME_ID, "" + volumeId); + params.put(ApiConstants.POLICY_ID, "" + policyId); params.put("ctxUserId", "1"); - params.put("ctxAccountId", "1"); + params.put("ctxAccountId", "" + volume.getAccountId()); params.put("ctxStartEventId", String.valueOf(eventId)); CreateSnapshotCmd cmd = new CreateSnapshotCmd(); @@ -246,15 +248,10 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { params.put("id", ""+cmd.getEntityId()); params.put("ctxStartEventId", "1"); - AsyncJobVO job = new AsyncJobVO(); - job.setUserId(userId); - // Just have SYSTEM own the job for now. Users won't be able to see this job, but - // it's an internal job so probably not a huge deal. - job.setAccountId(1L); - job.setCmd(CreateSnapshotCmd.class.getName()); - job.setInstanceId(cmd.getEntityId()); - job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params)); - + AsyncJobVO job = new AsyncJobVO(User.UID_SYSTEM, volume.getAccountId(), CreateSnapshotCmd.class.getName(), + ApiGsonHelper.getBuilder().create().toJson(params), cmd.getEntityId(), + cmd.getInstanceType()); + long jobId = _asyncMgr.submitAsyncJob(job); tmpSnapshotScheduleVO.setAsyncJobId(jobId); @@ -359,6 +356,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler { _testTimerTask = new TestClock(this, minutesPerHour, hoursPerDay, daysPerWeek, daysPerMonth, weeksPerMonth, monthsPerYear); } _currentTimestamp = new Date(); + s_logger.info("Snapshot Scheduler is configured."); return true; diff --git a/server/test/com/cloud/async/TestSyncQueueManager.java b/server/test/com/cloud/async/TestSyncQueueManager.java index f8be9b696b6..2bbf7bcc8bd 100644 --- a/server/test/com/cloud/async/TestSyncQueueManager.java +++ b/server/test/com/cloud/async/TestSyncQueueManager.java @@ -32,178 +32,178 @@ public class TestSyncQueueManager extends ComponentTestCase { private volatile int count = 0; private volatile long expectingCurrent = 1; - public void leftOverItems() { - SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( - SyncQueueManager.class); - - List l = mgr.getActiveQueueItems(1L, false); - if(l != null && l.size() > 0) { - for(SyncQueueItemVO item : l) { - s_logger.info("Left over item: " + item.toString()); - mgr.purgeItem(item.getId()); - } - } - } - - public void dequeueFromOneQueue() { - final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( - SyncQueueManager.class); - - final int totalRuns = 5000; - final SyncQueueVO queue = mgr.queue("vm_instance", 1L, "Async-job", 1); - for(int i = 1; i < totalRuns; i++) - mgr.queue("vm_instance", 1L, "Async-job", i+1); - - count = 0; - expectingCurrent = 1; - Thread thread1 = new Thread(new Runnable() { - public void run() { - while(count < totalRuns) { - SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L); - if(item != null) { - s_logger.info("Thread 1 process item: " + item.toString()); - - Assert.assertEquals(expectingCurrent, item.getContentId().longValue()); - expectingCurrent++; - count++; - - mgr.purgeItem(item.getId()); - } - try { - Thread.sleep(getRandomMilliseconds(1, 10)); - } catch (InterruptedException e) { - } - } - } - } - ); - - Thread thread2 = new Thread(new Runnable() { - public void run() { - while(count < totalRuns) { - SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L); - if(item != null) { - s_logger.info("Thread 2 process item: " + item.toString()); - - Assert.assertEquals(expectingCurrent, item.getContentId().longValue()); - expectingCurrent++; - count++; - mgr.purgeItem(item.getId()); - } - - try { - Thread.sleep(getRandomMilliseconds(1, 10)); - } catch (InterruptedException e) { - } - } - } - } - ); - - thread1.start(); - thread2.start(); - try { - thread1.join(); - } catch (InterruptedException e) { - } - try { - thread2.join(); - } catch (InterruptedException e) { - } - - Assert.assertEquals(totalRuns, count); - } - - public void dequeueFromAnyQueue() { - final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( - SyncQueueManager.class); - - // simulate 30 queues - final int queues = 30; - final int totalRuns = 100; - final int itemsPerRun = 20; - for(int q = 1; q <= queues; q++) - for(int i = 0; i < totalRuns; i++) - mgr.queue("vm_instance", q, "Async-job", i+1); - - count = 0; - Thread thread1 = new Thread(new Runnable() { - public void run() { - while(count < totalRuns*queues) { - List l = mgr.dequeueFromAny(1L, itemsPerRun); - if(l != null && l.size() > 0) { - s_logger.info("Thread 1 get " + l.size() + " dequeued items"); - - for(SyncQueueItemVO item : l) { - s_logger.info("Thread 1 process item: " + item.toString()); - count++; - - mgr.purgeItem(item.getId()); - } - } - try { - Thread.sleep(getRandomMilliseconds(1, 10)); - } catch (InterruptedException e) { - } - } - } - } - ); - - Thread thread2 = new Thread(new Runnable() { - public void run() { - while(count < totalRuns*queues) { - List l = mgr.dequeueFromAny(1L, itemsPerRun); - if(l != null && l.size() > 0) { - s_logger.info("Thread 2 get " + l.size() + " dequeued items"); - - for(SyncQueueItemVO item : l) { - s_logger.info("Thread 2 process item: " + item.toString()); - count++; - mgr.purgeItem(item.getId()); - } - } - - try { - Thread.sleep(getRandomMilliseconds(1, 10)); - } catch (InterruptedException e) { - } - } - } - } - ); - - thread1.start(); - thread2.start(); - try { - thread1.join(); - } catch (InterruptedException e) { - } - try { - thread2.join(); - } catch (InterruptedException e) { - } - Assert.assertEquals(queues*totalRuns, count); - } - - public void testPopulateQueueData() { - final int queues = 30000; - final int totalRuns = 100; - - final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( - SyncQueueManager.class); - for(int q = 1; q <= queues; q++) - for(int i = 0; i < totalRuns; i++) - mgr.queue("vm_instance", q, "Async-job", i+1); + public void leftOverItems() { + SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( + SyncQueueManager.class); + + List l = mgr.getActiveQueueItems(1L, false); + if(l != null && l.size() > 0) { + for(SyncQueueItemVO item : l) { + s_logger.info("Left over item: " + item.toString()); + mgr.purgeItem(item.getId()); + } + } + } + + public void dequeueFromOneQueue() { + final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( + SyncQueueManager.class); + + final int totalRuns = 5000; + final SyncQueueVO queue = mgr.queue("vm_instance", 1L, "Async-job", 1, 1); + for(int i = 1; i < totalRuns; i++) + mgr.queue("vm_instance", 1L, "Async-job", i+1, 1); + + count = 0; + expectingCurrent = 1; + Thread thread1 = new Thread(new Runnable() { + public void run() { + while(count < totalRuns) { + SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L); + if(item != null) { + s_logger.info("Thread 1 process item: " + item.toString()); + + Assert.assertEquals(expectingCurrent, item.getContentId().longValue()); + expectingCurrent++; + count++; + + mgr.purgeItem(item.getId()); + } + try { + Thread.sleep(getRandomMilliseconds(1, 10)); + } catch (InterruptedException e) { + } + } + } + } + ); + + Thread thread2 = new Thread(new Runnable() { + public void run() { + while(count < totalRuns) { + SyncQueueItemVO item = mgr.dequeueFromOne(queue.getId(), 1L); + if(item != null) { + s_logger.info("Thread 2 process item: " + item.toString()); + + Assert.assertEquals(expectingCurrent, item.getContentId().longValue()); + expectingCurrent++; + count++; + mgr.purgeItem(item.getId()); + } + + try { + Thread.sleep(getRandomMilliseconds(1, 10)); + } catch (InterruptedException e) { + } + } + } + } + ); + + thread1.start(); + thread2.start(); + try { + thread1.join(); + } catch (InterruptedException e) { + } + try { + thread2.join(); + } catch (InterruptedException e) { + } + + Assert.assertEquals(totalRuns, count); + } + + public void dequeueFromAnyQueue() { + final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( + SyncQueueManager.class); + + // simulate 30 queues + final int queues = 30; + final int totalRuns = 100; + final int itemsPerRun = 20; + for(int q = 1; q <= queues; q++) + for(int i = 0; i < totalRuns; i++) + mgr.queue("vm_instance", q, "Async-job", i+1, 1); + + count = 0; + Thread thread1 = new Thread(new Runnable() { + public void run() { + while(count < totalRuns*queues) { + List l = mgr.dequeueFromAny(1L, itemsPerRun); + if(l != null && l.size() > 0) { + s_logger.info("Thread 1 get " + l.size() + " dequeued items"); + + for(SyncQueueItemVO item : l) { + s_logger.info("Thread 1 process item: " + item.toString()); + count++; + + mgr.purgeItem(item.getId()); + } + } + try { + Thread.sleep(getRandomMilliseconds(1, 10)); + } catch (InterruptedException e) { + } + } + } + } + ); + + Thread thread2 = new Thread(new Runnable() { + public void run() { + while(count < totalRuns*queues) { + List l = mgr.dequeueFromAny(1L, itemsPerRun); + if(l != null && l.size() > 0) { + s_logger.info("Thread 2 get " + l.size() + " dequeued items"); + + for(SyncQueueItemVO item : l) { + s_logger.info("Thread 2 process item: " + item.toString()); + count++; + mgr.purgeItem(item.getId()); + } + } + + try { + Thread.sleep(getRandomMilliseconds(1, 10)); + } catch (InterruptedException e) { + } + } + } + } + ); + + thread1.start(); + thread2.start(); + try { + thread1.join(); + } catch (InterruptedException e) { + } + try { + thread2.join(); + } catch (InterruptedException e) { + } + Assert.assertEquals(queues*totalRuns, count); + } + + public void testPopulateQueueData() { + final int queues = 30000; + final int totalRuns = 100; + + final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( + SyncQueueManager.class); + for(int q = 1; q <= queues; q++) + for(int i = 0; i < totalRuns; i++) + mgr.queue("vm_instance", q, "Async-job", i+1, 1); } public void testSyncQueue() { final SyncQueueManager mgr = ComponentLocator.getCurrentLocator().getManager( SyncQueueManager.class); - mgr.queue("vm_instance", 1, "Async-job", 1); - mgr.queue("vm_instance", 1, "Async-job", 2); - mgr.queue("vm_instance", 1, "Async-job", 3); + mgr.queue("vm_instance", 1, "Async-job", 1, 1); + mgr.queue("vm_instance", 1, "Async-job", 2, 1); + mgr.queue("vm_instance", 1, "Async-job", 3, 1); mgr.dequeueFromAny(100L, 1); List l = mgr.getBlockedQueueItems(100000, false); diff --git a/setup/db/create-schema.sql b/setup/db/create-schema.sql index 5b6dc0447c7..302eadbd234 100755 --- a/setup/db/create-schema.sql +++ b/setup/db/create-schema.sql @@ -1362,16 +1362,15 @@ CREATE TABLE `cloud`.`sync_queue` ( `id` bigint unsigned NOT NULL auto_increment, `sync_objtype` varchar(64) NOT NULL, `sync_objid` bigint unsigned NOT NULL, - `queue_proc_msid` bigint, `queue_proc_number` bigint COMMENT 'process number, increase 1 for each iteration', - `queue_proc_time` datetime COMMENT 'last time to process the queue', `created` datetime COMMENT 'date created', `last_updated` datetime COMMENT 'date created', + `queue_size` smallint DEFAULT 0 COMMENT 'number of items being processed by the queue', + `queue_size_limit` smallint DEFAULT 1 COMMENT 'max number of items the queue can process concurrently', PRIMARY KEY (`id`), UNIQUE `i_sync_queue__objtype__objid`(`sync_objtype`, `sync_objid`), INDEX `i_sync_queue__created`(`created`), - INDEX `i_sync_queue__last_updated`(`last_updated`), - INDEX `i_sync_queue__queue_proc_time`(`queue_proc_time`) + INDEX `i_sync_queue__last_updated`(`last_updated`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `cloud`.`stack_maid` ( @@ -1392,13 +1391,15 @@ CREATE TABLE `cloud`.`sync_queue_item` ( `content_id` bigint, `queue_proc_msid` bigint COMMENT 'owner msid when the queue item is being processed', `queue_proc_number` bigint COMMENT 'used to distinguish raw items and items being in process', + `queue_proc_time` datetime COMMENT 'when processing started for the item', `created` datetime COMMENT 'time created', PRIMARY KEY (`id`), CONSTRAINT `fk_sync_queue_item__queue_id` FOREIGN KEY `fk_sync_queue_item__queue_id` (`queue_id`) REFERENCES `sync_queue` (`id`) ON DELETE CASCADE, INDEX `i_sync_queue_item__queue_id`(`queue_id`), INDEX `i_sync_queue_item__created`(`created`), INDEX `i_sync_queue_item__queue_proc_number`(`queue_proc_number`), - INDEX `i_sync_queue_item__queue_proc_msid`(`queue_proc_msid`) + INDEX `i_sync_queue_item__queue_proc_msid`(`queue_proc_msid`), + INDEX `i_sync_queue__queue_proc_time`(`queue_proc_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `cloud`.`disk_offering` ( @@ -2371,4 +2372,3 @@ CREATE TABLE `cloud`.`nicira_nvp_nic_map` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8; SET foreign_key_checks = 1; -