diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java index fb6e160e0f4..cb81069d425 100644 --- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java +++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java @@ -16,13 +16,19 @@ // under the License. package com.cloud.async.dao; +import java.util.List; + import com.cloud.async.AsyncJobJoinMapVO; import com.cloud.utils.db.GenericDao; public interface AsyncJobJoinMapDao extends GenericDao { + Long joinJob(long jobId, long joinJobId, long joinMsid, Long syncSourceId, String wakeupHandler, String wakeupDispatcher); void disjoinJob(long jobId, long joinedJobId); + AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId); + List listJoinRecords(long jobId); + void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid); } diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java index 5b76f1ef604..a7d3779bf03 100644 --- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java +++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java @@ -20,7 +20,6 @@ import java.util.List; import com.cloud.async.AsyncJobConstants; import com.cloud.async.AsyncJobJoinMapVO; -import com.cloud.async.AsyncJobResult; import com.cloud.utils.DateUtil; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; @@ -31,6 +30,7 @@ import com.cloud.utils.db.SearchCriteria.Op; public class AsyncJobJoinMapDaoImpl extends GenericDaoBase implements AsyncJobJoinMapDao { private final SearchBuilder RecordSearch; + private final SearchBuilder RecordSearchByOwner; private final SearchBuilder CompleteJoinSearch; public AsyncJobJoinMapDaoImpl() { @@ -38,6 +38,10 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase listJoinRecords(long jobId) { + SearchCriteria sc = RecordSearchByOwner.create(); + sc.setParameters("jobId", jobId); + + return this.listBy(sc); + } + public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) { AsyncJobJoinMapVO record = createForUpdate(); record.setJoinStatus(joinStatus); diff --git a/server/src/com/cloud/vm/VmWorkJobDispatcher.java b/server/src/com/cloud/vm/VmWorkJobDispatcher.java index fc0305645c3..631d9f15339 100644 --- a/server/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/server/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -29,7 +29,6 @@ import com.cloud.async.AsyncJob; import com.cloud.async.AsyncJobConstants; import com.cloud.async.AsyncJobDispatcher; import com.cloud.async.AsyncJobManager; -import com.cloud.async.AsyncJobResult; import com.cloud.user.AccountVO; import com.cloud.user.UserContext; import com.cloud.user.dao.AccountDao; diff --git a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java new file mode 100644 index 00000000000..2e57c7948dd --- /dev/null +++ b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloud.vm; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; + +import org.apache.log4j.Logger; + +import com.cloud.api.ApiSerializerHelper; +import com.cloud.async.AsyncJob; +import com.cloud.async.AsyncJobDispatcher; +import com.cloud.async.AsyncJobJoinMapVO; +import com.cloud.async.dao.AsyncJobJoinMapDao; +import com.cloud.user.AccountVO; +import com.cloud.user.UserContext; +import com.cloud.user.dao.AccountDao; +import com.cloud.utils.component.AdapterBase; +import com.cloud.vm.dao.VMInstanceDao; + +public class VmWorkJobWakeupDispatcher extends AdapterBase implements AsyncJobDispatcher { + private static final Logger s_logger = Logger.getLogger(VmWorkJobWakeupDispatcher.class); + + @Inject private VmWorkJobDao _workjobDao; + @Inject private AsyncJobJoinMapDao _joinMapDao; + @Inject private AccountDao _accountDao; + @Inject private VMInstanceDao _instanceDao; + @Inject private VirtualMachineManager _vmMgr; + + private Map _handlerMap = new HashMap(); + + @Override + public void RunJob(AsyncJob job) { + try { + List joinRecords =_joinMapDao.listJoinRecords(job.getId()); + if(joinRecords.size() != 1) { + s_logger.warn("Job-" + job.getId() + + " received wakeup call with un-supported joining job number: " + joinRecords.size()); + + job.setSyncSource(null); + return; + } + + AsyncJobJoinMapVO joinRecord = joinRecords.get(0); + VmWorkJobVO joinedJob = _workjobDao.findById(joinRecord.getJoinJobId()); + + // get original work context information from joined job + VmWork work = (VmWork)ApiSerializerHelper.fromSerializedString(joinedJob.getCmdInfo()); + assert(work != null); + + AccountVO account = _accountDao.findById(work.getAccountId()); + assert(account != null); + + VMInstanceVO vm = _instanceDao.findById(work.getVmId()); + assert(vm != null); + + UserContext.registerContext(work.getUserId(), account, null, false); + try { + Method handler = getHandler(joinRecord.getWakeupHandler()); + if(handler != null) { + handler.invoke(_vmMgr); + } else { + assert(false); + s_logger.error("Unable to find wakeup handler " + joinRecord.getWakeupHandler() + + " when waking up job-" + job.getId()); + } + } finally { + UserContext.unregisterContext(); + } + } catch(Throwable e) { + s_logger.warn("Unexpected exception in waking up job-" + job.getId()); + + job.setSyncSource(null); + } + } + + private Method getHandler(String wakeupHandler) { + + synchronized(_handlerMap) { + Class clz = _vmMgr.getClass(); + Method method = _handlerMap.get(wakeupHandler); + if(method != null) + return method; + + try { + method = clz.getMethod(wakeupHandler); + method.setAccessible(true); + } catch (SecurityException e) { + assert(false); + s_logger.error("Unexpected exception", e); + return null; + } catch (NoSuchMethodException e) { + assert(false); + s_logger.error("Unexpected exception", e); + return null; + } + + _handlerMap.put(wakeupHandler, method); + return method; + } + } +}