Move EventBus hookup on job framework to ApiServer to decouple job framework away from business logic related hookups. The decoupling is done through internal messaging facility provided inside management server.

This commit is contained in:
Kelven Yang 2014-05-05 15:21:59 -07:00
parent fc04e4b4ec
commit 521ac796dd
3 changed files with 94 additions and 0 deletions

View File

@ -30,6 +30,7 @@ public interface AsyncJob extends JobInfo {
public static interface Topics {
public static final String JOB_HEARTBEAT = "job.heartbeat";
public static final String JOB_STATE = "job.state";
public static final String JOB_EVENT_PUBLISH = "job.eventpublish";
}
public static interface Constants {

View File

@ -60,6 +60,7 @@ import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.utils.DateUtil;
import com.cloud.utils.Pair;
import com.cloud.utils.Predicate;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
@ -1009,4 +1010,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private void publishOnEventBus(AsyncJob job, String jobEvent) {
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent));
}
}

View File

@ -82,6 +82,7 @@ import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.apache.cloudstack.acl.APIChecker;
@ -119,9 +120,14 @@ import org.apache.cloudstack.api.response.ListResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.config.impl.ConfigurationVO;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import com.cloud.api.dispatch.DispatchChainFactory;
@ -130,8 +136,10 @@ import com.cloud.api.response.ApiResponseSerializer;
import com.cloud.configuration.Config;
import com.cloud.domain.Domain;
import com.cloud.domain.DomainVO;
import com.cloud.domain.dao.DomainDao;
import com.cloud.event.ActionEventUtils;
import com.cloud.event.EventTypes;
import com.cloud.event.EventCategory;
import com.cloud.exception.AccountLimitException;
import com.cloud.exception.CloudAuthenticationException;
import com.cloud.exception.InsufficientCapacityException;
@ -181,6 +189,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
private AccountManager _accountMgr;
@Inject
private DomainManager _domainMgr;
@Inject
private DomainDao _domainDao;
@Inject
private AsyncJobManager _asyncMgr;
@Inject
@ -200,15 +211,92 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
private static ExecutorService s_executor = new ThreadPoolExecutor(10, 150, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
"ApiServer"));
@Inject
MessageBus _messageBus;
public ApiServer() {
}
@Override
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
_messageBus.subscribe(AsyncJob.Topics.JOB_EVENT_PUBLISH, MessageDispatcher.getDispatcher(this));
return true;
}
@MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH)
private void handleAsyncJobPublishEvent(String subject, String senderAddress, Object args) {
assert (args != null);
@SuppressWarnings("unchecked")
Pair<AsyncJob, String> eventInfo = (Pair<AsyncJob, String>)args;
AsyncJob job = eventInfo.first();
String jobEvent = eventInfo.second();
if (s_logger.isTraceEnabled())
s_logger.trace("Handle asyjob publish event " + jobEvent);
EventBus eventBus = null;
try {
eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) {
return;
}
User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
// Get the event type from the cmdInfo json string
String info = job.getCmdInfo();
String cmdEventType;
if (info == null) {
cmdEventType = "unknown";
} else {
String marker = "\"cmdEventType\"";
int begin = info.indexOf(marker);
cmdEventType = info.substring(begin + marker.length() + 2, info.indexOf(",", begin) - 1);
}
// For some reason, the instanceType / instanceId are not abstract, which means we may get null values.
org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event(
"management-server",
EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(),
jobEvent,
(job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown"), null);
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("command", job.getCmd());
eventDescription.put("user", userJobOwner.getUuid());
eventDescription.put("account", jobOwner.getUuid());
eventDescription.put("processStatus", "" + job.getProcessStatus());
eventDescription.put("resultCode", "" + job.getResultCode());
eventDescription.put("instanceUuid", ApiDBUtils.findJobInstanceUuid(job));
eventDescription.put("instanceType", (job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown"));
eventDescription.put("commandEventType", cmdEventType);
eventDescription.put("jobId", job.getUuid());
// If the event.accountinfo boolean value is set, get the human readable value for the username / domainname
Map<String, String> configs = _configDao.getConfiguration("management-server", new HashMap<String, String>());
if (Boolean.valueOf(configs.get("event.accountinfo"))) {
DomainVO domain = _domainDao.findById(jobOwner.getDomainId());
eventDescription.put("username", userJobOwner.getUsername());
eventDescription.put("domainname", domain.getName());
}
event.setDescription(eventDescription);
try {
eventBus.publish(event);
} catch (EventBusException evx) {
String errMsg = "F" +
"" +
"ailed to publish async job event on the the event bus.";
s_logger.warn(errMsg, evx);
throw new CloudRuntimeException(errMsg);
}
}
@Override
public boolean start() {
Integer apiPort = null; // api port, null by default