From 521ac796dd94de30c1092980cb80202588199ec3 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Mon, 5 May 2014 15:21:59 -0700 Subject: [PATCH] Move EventBus hookup on job framework to ApiServer to decouple job framework away from business logic related hookups. The decoupling is done through internal messaging facility provided inside management server. --- .../cloudstack/framework/jobs/AsyncJob.java | 1 + .../jobs/impl/AsyncJobManagerImpl.java | 5 ++ server/src/com/cloud/api/ApiServer.java | 88 +++++++++++++++++++ 3 files changed, 94 insertions(+) diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java index e62482193ae..0ad49e1e7bf 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java @@ -30,6 +30,7 @@ public interface AsyncJob extends JobInfo { public static interface Topics { public static final String JOB_HEARTBEAT = "job.heartbeat"; public static final String JOB_STATE = "job.state"; + public static final String JOB_EVENT_PUBLISH = "job.eventpublish"; } public static interface Constants { diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 42148bebaed..24abcbe868b 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -60,6 +60,7 @@ import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ManagementServerHost; import com.cloud.utils.DateUtil; +import com.cloud.utils.Pair; import com.cloud.utils.Predicate; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; @@ -1009,4 +1010,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } + private void publishOnEventBus(AsyncJob job, String jobEvent) { + _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, + new Pair(job, jobEvent)); + } } diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index 2ea717dcba6..9c65fedd6b1 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -82,6 +82,7 @@ import org.apache.http.protocol.ResponseContent; import org.apache.http.protocol.ResponseDate; import org.apache.http.protocol.ResponseServer; import org.apache.log4j.Logger; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; import org.apache.cloudstack.acl.APIChecker; @@ -119,9 +120,14 @@ import org.apache.cloudstack.api.response.ListResponse; import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.framework.config.impl.ConfigurationVO; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventBusException; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; +import org.apache.cloudstack.framework.messagebus.MessageHandler; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import com.cloud.api.dispatch.DispatchChainFactory; @@ -130,8 +136,10 @@ import com.cloud.api.response.ApiResponseSerializer; import com.cloud.configuration.Config; import com.cloud.domain.Domain; import com.cloud.domain.DomainVO; +import com.cloud.domain.dao.DomainDao; import com.cloud.event.ActionEventUtils; import com.cloud.event.EventTypes; +import com.cloud.event.EventCategory; import com.cloud.exception.AccountLimitException; import com.cloud.exception.CloudAuthenticationException; import com.cloud.exception.InsufficientCapacityException; @@ -181,6 +189,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer private AccountManager _accountMgr; @Inject private DomainManager _domainMgr; + @Inject + private DomainDao _domainDao; + @Inject private AsyncJobManager _asyncMgr; @Inject @@ -200,15 +211,92 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer private static ExecutorService s_executor = new ThreadPoolExecutor(10, 150, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory( "ApiServer")); + @Inject + MessageBus _messageBus; public ApiServer() { } @Override public boolean configure(final String name, final Map params) throws ConfigurationException { + _messageBus.subscribe(AsyncJob.Topics.JOB_EVENT_PUBLISH, MessageDispatcher.getDispatcher(this)); return true; } + @MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH) + private void handleAsyncJobPublishEvent(String subject, String senderAddress, Object args) { + assert (args != null); + + @SuppressWarnings("unchecked") + Pair eventInfo = (Pair)args; + AsyncJob job = eventInfo.first(); + String jobEvent = eventInfo.second(); + + if (s_logger.isTraceEnabled()) + s_logger.trace("Handle asyjob publish event " + jobEvent); + + EventBus eventBus = null; + try { + eventBus = ComponentContext.getComponent(EventBus.class); + } catch (NoSuchBeanDefinitionException nbe) { + return; // no provider is configured to provide events bus, so just return + } + + if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) { + return; + } + + User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId()); + Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId()); + + // Get the event type from the cmdInfo json string + String info = job.getCmdInfo(); + String cmdEventType; + if (info == null) { + cmdEventType = "unknown"; + } else { + String marker = "\"cmdEventType\""; + int begin = info.indexOf(marker); + cmdEventType = info.substring(begin + marker.length() + 2, info.indexOf(",", begin) - 1); + } + + // For some reason, the instanceType / instanceId are not abstract, which means we may get null values. + org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event( + "management-server", + EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(), + jobEvent, + (job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown"), null); + + Map eventDescription = new HashMap(); + eventDescription.put("command", job.getCmd()); + eventDescription.put("user", userJobOwner.getUuid()); + eventDescription.put("account", jobOwner.getUuid()); + eventDescription.put("processStatus", "" + job.getProcessStatus()); + eventDescription.put("resultCode", "" + job.getResultCode()); + eventDescription.put("instanceUuid", ApiDBUtils.findJobInstanceUuid(job)); + eventDescription.put("instanceType", (job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown")); + eventDescription.put("commandEventType", cmdEventType); + eventDescription.put("jobId", job.getUuid()); + // If the event.accountinfo boolean value is set, get the human readable value for the username / domainname + Map configs = _configDao.getConfiguration("management-server", new HashMap()); + if (Boolean.valueOf(configs.get("event.accountinfo"))) { + DomainVO domain = _domainDao.findById(jobOwner.getDomainId()); + eventDescription.put("username", userJobOwner.getUsername()); + eventDescription.put("domainname", domain.getName()); + } + event.setDescription(eventDescription); + + try { + eventBus.publish(event); + } catch (EventBusException evx) { + String errMsg = "F" + + "" + + "ailed to publish async job event on the the event bus."; + s_logger.warn(errMsg, evx); + throw new CloudRuntimeException(errMsg); + } + } + @Override public boolean start() { Integer apiPort = null; // api port, null by default