From a8a09ba2879b82b51b3d96eb84288d2380cf6c06 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Mon, 6 May 2013 10:01:46 -0700 Subject: [PATCH] hook-up new sync with VirtualMachineGuru(s) --- api/src/com/cloud/async/AsyncJob.java | 7 +- .../com/cloud/async/AsyncJobJournalVO.java | 108 ++++++ core/src/com/cloud/async/AsyncJobVO.java | 12 - .../cloud/serializer/SerializerHelper.java | 47 ++- .../lb/ElasticLoadBalancerManagerImpl.java | 9 + .../cloud/async/AsyncJobExecutionContext.java | 25 ++ .../src/com/cloud/async/AsyncJobManager.java | 2 + .../com/cloud/async/AsyncJobManagerImpl.java | 38 ++- .../src/com/cloud/async/AsyncJobMonitor.java | 6 +- .../com/cloud/async/dao/AsyncJobDaoImpl.java | 1 - .../cloud/async/dao/AsyncJobJournalDao.java | 26 ++ .../async/dao/AsyncJobJournalDaoImpl.java | 44 +++ .../cloud/cluster/dao/StackMaidDaoImpl.java | 2 +- .../AgentBasedConsoleProxyManager.java | 9 + .../consoleproxy/ConsoleProxyManagerImpl.java | 34 ++ .../VirtualNetworkApplianceManagerImpl.java | 9 + .../SecondaryStorageManagerImpl.java | 11 +- .../src/com/cloud/vm/UserVmManagerImpl.java | 8 + .../src/com/cloud/vm/VirtualMachineGuru.java | 7 +- .../com/cloud/vm/VirtualMachineManager.java | 7 + .../cloud/vm/VirtualMachineManagerImpl.java | 323 ++++++++++++++---- .../vm/VirtualMachinePowerStateSync.java | 6 + .../vm/VirtualMachinePowerStateSyncImpl.java | 96 ++++-- server/src/com/cloud/vm/VmWorkConstants.java | 4 +- .../src/com/cloud/vm/VmWorkJobDispatcher.java | 27 +- ...jectConstants.java => TopicConstants.java} | 2 +- .../async/AsyncJobTestConfiguration.java | 7 + .../async/MockVirtualMachineManagerImpl.java | 14 +- .../com/cloud/async/TestAsyncJobManager.java | 59 +++- .../com/cloud/vm/MockUserVmManagerImpl.java | 8 + .../vm/VirtualMachineManagerImplTest.java | 2 + .../VmWorkMockVirtualMachineManagerImpl.java | 17 +- server/test/com/cloud/vm/VmWorkTest.java | 15 + .../com/cloud/vm/VmWorkTestConfiguration.java | 7 + setup/db/db/schema-410to420.sql | 11 +- 35 files changed, 871 insertions(+), 139 deletions(-) create mode 100644 core/src/com/cloud/async/AsyncJobJournalVO.java create mode 100644 server/src/com/cloud/async/dao/AsyncJobJournalDao.java create mode 100644 server/src/com/cloud/async/dao/AsyncJobJournalDaoImpl.java rename server/src/org/apache/cloudstack/messagebus/{SubjectConstants.java => TopicConstants.java} (96%) diff --git a/api/src/com/cloud/async/AsyncJob.java b/api/src/com/cloud/async/AsyncJob.java index c8dc6422146..0d61f92d71d 100644 --- a/api/src/com/cloud/async/AsyncJob.java +++ b/api/src/com/cloud/async/AsyncJob.java @@ -22,8 +22,11 @@ import org.apache.cloudstack.api.InternalIdentity; import java.util.Date; public interface AsyncJob extends Identity, InternalIdentity { - Long getParentId(); - + + public enum JournalType { + SUCCESS, FAILURE + }; + String getType(); String getDispatcher(); diff --git a/core/src/com/cloud/async/AsyncJobJournalVO.java b/core/src/com/cloud/async/AsyncJobJournalVO.java new file mode 100644 index 00000000000..1cf57e93c06 --- /dev/null +++ b/core/src/com/cloud/async/AsyncJobJournalVO.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async; + +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; + +import com.cloud.utils.DateUtil; +import com.cloud.utils.db.GenericDao; + +@Entity +@Table(name="async_job_journal") +public class AsyncJobJournalVO { + @Id + @GeneratedValue(strategy=GenerationType.IDENTITY) + @Column(name="id") + private Long id = null; + + @Column(name="job_id") + private long jobId; + + @Column(name="journal_type", updatable=false, nullable=false, length=32) + @Enumerated(value=EnumType.STRING) + private AsyncJob.JournalType journalType; + + @Column(name="journal_text", length=1024) + private String journalText; + + @Column(name="journal_obj", length=1024) + private String journalObjJsonString; + + @Column(name=GenericDao.CREATED_COLUMN) + protected Date created; + + public AsyncJobJournalVO() { + created = DateUtil.currentGMTTime(); + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public long getJobId() { + return jobId; + } + + public void setJobId(long jobId) { + this.jobId = jobId; + } + + public AsyncJob.JournalType getJournalType() { + return journalType; + } + + public void setJournalType(AsyncJob.JournalType journalType) { + this.journalType = journalType; + } + + public String getJournalText() { + return journalText; + } + + public void setJournalText(String journalText) { + this.journalText = journalText; + } + + public String getJournalObjJsonString() { + return journalObjJsonString; + } + + public void setJournalObjJsonString(String journalObjJsonString) { + this.journalObjJsonString = journalObjJsonString; + } + + public Date getCreated() { + return created; + } + + public void setCreated(Date created) { + this.created = created; + } +} diff --git a/core/src/com/cloud/async/AsyncJobVO.java b/core/src/com/cloud/async/AsyncJobVO.java index 53d2e1eac6d..bfb2416fbc3 100644 --- a/core/src/com/cloud/async/AsyncJobVO.java +++ b/core/src/com/cloud/async/AsyncJobVO.java @@ -46,9 +46,6 @@ public class AsyncJobVO implements AsyncJob { @Column(name="id") private Long id = null; - @Column(name="parent_id") - private Long parentId; - @Column(name="job_type", length=32) protected String type; @@ -142,15 +139,6 @@ public class AsyncJobVO implements AsyncJob { this.id = id; } - @Override - public Long getParentId() { - return this.parentId; - } - - public void setParentId(Long parentId) { - this.parentId = parentId; - } - @Override public String getType() { return this.type; diff --git a/core/src/com/cloud/serializer/SerializerHelper.java b/core/src/com/cloud/serializer/SerializerHelper.java index dd858b79223..e3449f6e443 100644 --- a/core/src/com/cloud/serializer/SerializerHelper.java +++ b/core/src/com/cloud/serializer/SerializerHelper.java @@ -16,6 +16,12 @@ // under the License. package com.cloud.serializer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -24,6 +30,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import org.apache.commons.codec.binary.Base64; import org.apache.log4j.Logger; import com.cloud.utils.DateUtil; @@ -37,7 +44,7 @@ public class SerializerHelper { public static final Logger s_logger = Logger.getLogger(SerializerHelper.class.getName()); public static String token = "/"; - public static String toSerializedStringOld(Object result) { + public static String toSerializedString(Object result) { if(result != null) { Class clz = result.getClass(); Gson gson = GsonHelper.getGson(); @@ -83,6 +90,44 @@ public class SerializerHelper { throw e; } } + + public static String toObjectSerializedString(Serializable object) { + assert(object != null); + + ByteArrayOutputStream bs = new ByteArrayOutputStream(); + try { + ObjectOutputStream os = new ObjectOutputStream(bs); + os.writeObject(object); + os.close(); + bs.close(); + + return Base64.encodeBase64URLSafeString(bs.toByteArray()); + } catch(IOException e) { + s_logger.error("Unexpected exception", e); + } + return null; + } + + public static Object fromObjectSerializedString(String base64EncodedString) { + if(base64EncodedString == null) + return null; + + byte[] content = Base64.decodeBase64(base64EncodedString); + ByteArrayInputStream bs = new ByteArrayInputStream(content); + try { + ObjectInputStream is = new ObjectInputStream(bs); + Object obj = is.readObject(); + is.close(); + bs.close(); + return obj; + } catch(IOException e) { + s_logger.error("Unexpected exception", e); + } catch(ClassNotFoundException e) { + s_logger.error("Unexpected exception", e); + } + + return null; + } public static List> toPairList(Object o, String name) { List> l = new ArrayList>(); diff --git a/plugins/network-elements/elastic-loadbalancer/src/com/cloud/network/lb/ElasticLoadBalancerManagerImpl.java b/plugins/network-elements/elastic-loadbalancer/src/com/cloud/network/lb/ElasticLoadBalancerManagerImpl.java index 283b517dce9..0c5f972f62f 100644 --- a/plugins/network-elements/elastic-loadbalancer/src/com/cloud/network/lb/ElasticLoadBalancerManagerImpl.java +++ b/plugins/network-elements/elastic-loadbalancer/src/com/cloud/network/lb/ElasticLoadBalancerManagerImpl.java @@ -131,6 +131,7 @@ import com.cloud.vm.NicProfile; import com.cloud.vm.ReservationContext; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; +import com.cloud.vm.VmWork; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.VirtualMachineGuru; import com.cloud.vm.VirtualMachineManager; @@ -1001,5 +1002,13 @@ ElasticLoadBalancerManager, VirtualMachineGuru { @Override public void prepareStop(VirtualMachineProfile profile) { } + + @Override + public void vmWorkStart(VmWork work) { + } + + @Override + public void vmWorkStop(VmWork work) { + } } diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java index f71a391e4bd..c02797512cc 100644 --- a/server/src/com/cloud/async/AsyncJobExecutionContext.java +++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java @@ -16,9 +16,13 @@ // under the License. package com.cloud.async; +import javax.inject.Inject; + public class AsyncJobExecutionContext { private AsyncJob _job; + @Inject private AsyncJobManager _jobMgr; + private static ThreadLocal s_currentExectionContext = new ThreadLocal(); public AsyncJobExecutionContext() { @@ -40,6 +44,27 @@ public class AsyncJobExecutionContext { _job = job; } + public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) { + assert(_job != null); + _jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject); + } + + public void updateAsyncJobStatus(int processStatus, Object resultObject) { + assert(_job != null); + _jobMgr.updateAsyncJobStatus(_job.getId(), processStatus, resultObject); + } + + public void updateAsyncJobAttachment(String instanceType, Long instanceId) { + assert(_job != null); + _jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId); + } + + public void logJobJournal(AsyncJob.JournalType journalType, String + journalText, String journalObjJson) { + assert(_job != null); + _jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson); + } + public static AsyncJobExecutionContext getCurrentExecutionContext() { return s_currentExectionContext.get(); } diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java index fc8de18f17c..8cafa5fb060 100644 --- a/server/src/com/cloud/async/AsyncJobManager.java +++ b/server/src/com/cloud/async/AsyncJobManager.java @@ -37,6 +37,8 @@ public interface AsyncJobManager extends Manager { public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject); public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject); public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); + public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String + journalText, String journalObjJson); public void releaseSyncSource(); public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index f091d5d32b6..468782ef2f9 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -43,6 +43,7 @@ import org.apache.log4j.NDC; import com.cloud.api.ApiSerializerHelper; import com.cloud.async.dao.AsyncJobDao; +import com.cloud.async.dao.AsyncJobJournalDao; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ManagementServerHostVO; @@ -58,6 +59,7 @@ import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Predicate; import com.cloud.utils.PropertiesUtil; +import com.cloud.utils.component.ComponentContext; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; @@ -82,6 +84,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private ClusterManager _clusterMgr; @Inject private AccountManager _accountMgr; @Inject private AsyncJobDao _jobDao; + @Inject private AsyncJobJournalDao _journalDao; @Inject private ConfigurationDao _configDao; @Inject private List _jobDispatchers; @Inject private MessageBus _messageBus; @@ -175,9 +178,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ", resultCode: " + resultCode + ", result: " + resultObject); } - Transaction txt = Transaction.currentTxn(); + Transaction txn = Transaction.currentTxn(); try { - txt.start(); + txn.start(); AsyncJobVO job = _jobDao.findById(jobId); if(job == null) { if(s_logger.isDebugEnabled()) { @@ -185,9 +188,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ", resultCode: " + resultCode + ", result: " + resultObject); } - txt.rollback(); + txn.rollback(); return; } + + if(job.getStatus() != AsyncJobResult.STATUS_IN_PROGRESS) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("job-" + jobId + " is already completed."); + } + + txn.rollback(); + return; + } job.setCompleteMsid(getMsid()); job.setStatus(jobStatus); @@ -203,10 +215,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setLastUpdated(DateUtil.currentGMTTime()); _jobDao.update(jobId, job); - txt.commit(); + txn.commit(); } catch(Exception e) { s_logger.error("Unexpected exception while completing async job-" + jobId, e); - txt.rollback(); + txn.rollback(); } } @@ -267,6 +279,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } } + @Override @DB + public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String + journalText, String journalObjJson) { + AsyncJobJournalVO journal = new AsyncJobJournalVO(); + journal.setJobId(jobId); + journal.setJournalType(journalType); + journal.setJournalText(journalText); + journal.setJournalObjJsonString(journalObjJson); + + _journalDao.persist(journal); + } + @Override public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) { if(s_logger.isDebugEnabled()) { @@ -420,7 +444,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } - AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job)); + AsyncJobExecutionContext.setCurrentExecutionContext( + (AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) + ); // execute the job if(s_logger.isDebugEnabled()) { diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java index 4208444cab9..bd5b2cd1df3 100644 --- a/server/src/com/cloud/async/AsyncJobMonitor.java +++ b/server/src/com/cloud/async/AsyncJobMonitor.java @@ -27,7 +27,7 @@ import javax.naming.ConfigurationException; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.MessageDispatcher; import org.apache.cloudstack.framework.messagebus.MessageHandler; -import org.apache.cloudstack.messagebus.SubjectConstants; +import org.apache.cloudstack.messagebus.TopicConstants; import org.apache.log4j.Logger; import com.cloud.utils.component.ManagerBase; @@ -66,7 +66,7 @@ public class AsyncJobMonitor extends ManagerBase { _inactivityWarningThresholdMs = thresholdMs; } - @MessageHandler(topic=SubjectConstants.JOB_HEARTBEAT) + @MessageHandler(topic=TopicConstants.JOB_HEARTBEAT) public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) { if(args != null && args instanceof Long) { synchronized(this) { @@ -93,7 +93,7 @@ public class AsyncJobMonitor extends ManagerBase { public boolean configure(String name, Map params) throws ConfigurationException { - _messageBus.subscribe(SubjectConstants.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this)); + _messageBus.subscribe(TopicConstants.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this)); _timer.scheduleAtFixedRate(new TimerTask() { @Override diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java index 0e51b3af0eb..992b94230ac 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java +++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.log4j.Logger; -import com.cloud.async.AsyncJob; import com.cloud.async.AsyncJobResult; import com.cloud.async.AsyncJobVO; import com.cloud.utils.db.DB; diff --git a/server/src/com/cloud/async/dao/AsyncJobJournalDao.java b/server/src/com/cloud/async/dao/AsyncJobJournalDao.java new file mode 100644 index 00000000000..6cfbe931ed8 --- /dev/null +++ b/server/src/com/cloud/async/dao/AsyncJobJournalDao.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async.dao; + +import java.util.List; + +import com.cloud.async.AsyncJobJournalVO; +import com.cloud.utils.db.GenericDao; + +public interface AsyncJobJournalDao extends GenericDao { + List getJobJournal(long jobId); +} diff --git a/server/src/com/cloud/async/dao/AsyncJobJournalDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobJournalDaoImpl.java new file mode 100644 index 00000000000..5dd0141907b --- /dev/null +++ b/server/src/com/cloud/async/dao/AsyncJobJournalDaoImpl.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async.dao; + +import java.util.List; + +import com.cloud.async.AsyncJobJournalVO; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Op; + +public class AsyncJobJournalDaoImpl extends GenericDaoBase implements AsyncJobJournalDao { + + private final SearchBuilder JobJournalSearch; + + public AsyncJobJournalDaoImpl() { + JobJournalSearch = createSearchBuilder(); + JobJournalSearch.and("jobId", JobJournalSearch.entity().getJobId(), Op.EQ); + JobJournalSearch.done(); + } + + @Override + public List getJobJournal(long jobId) { + SearchCriteria sc = JobJournalSearch.create(); + sc.setParameters("jobId", jobId); + + return this.listBy(sc); + } +} diff --git a/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java b/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java index 243b00ff4a3..59d1a33aa65 100644 --- a/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java @@ -91,7 +91,7 @@ public class StackMaidDaoImpl extends GenericDaoBase impleme delegateItem.setThreadId(Thread.currentThread().getId()); delegateItem.setSeq(seq); delegateItem.setDelegate(delegateClzName); - delegateItem.setContext(SerializerHelper.toSerializedStringOld(context)); + delegateItem.setContext(SerializerHelper.toSerializedString(context)); delegateItem.setCreated(DateUtil.currentGMTTime()); super.persist(delegateItem); diff --git a/server/src/com/cloud/consoleproxy/AgentBasedConsoleProxyManager.java b/server/src/com/cloud/consoleproxy/AgentBasedConsoleProxyManager.java index 6f8575d751c..58cd660bb26 100755 --- a/server/src/com/cloud/consoleproxy/AgentBasedConsoleProxyManager.java +++ b/server/src/com/cloud/consoleproxy/AgentBasedConsoleProxyManager.java @@ -59,6 +59,7 @@ import com.cloud.vm.VirtualMachineGuru; import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.VirtualMachineName; import com.cloud.vm.VirtualMachineProfile; +import com.cloud.vm.VmWork; import com.cloud.vm.dao.ConsoleProxyDao; import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.VMInstanceDao; @@ -356,4 +357,12 @@ public class AgentBasedConsoleProxyManager extends ManagerBase implements Consol @Override public void prepareStop(VirtualMachineProfile profile) { } + + @Override + public void vmWorkStart(VmWork work) { + } + + @Override + public void vmWorkStop(VmWork work) { + } } diff --git a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java index 1edd8692ec7..7ee889b1c6e 100755 --- a/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java +++ b/server/src/com/cloud/consoleproxy/ConsoleProxyManagerImpl.java @@ -55,11 +55,14 @@ import com.cloud.agent.api.to.NicTO; import com.cloud.agent.api.to.VirtualMachineTO; import com.cloud.agent.manager.Commands; import com.cloud.api.commands.DestroyConsoleProxyCmd; +import com.cloud.async.AsyncJobExecutionContext; +import com.cloud.async.AsyncJobResult; import com.cloud.certificate.dao.CertificateDao; import com.cloud.cluster.ClusterManager; import com.cloud.configuration.Config; import com.cloud.configuration.ZoneConfig; import com.cloud.configuration.dao.ConfigurationDao; +import com.cloud.dao.EntityManager; import com.cloud.dc.DataCenter; import com.cloud.dc.DataCenter.NetworkType; import com.cloud.dc.DataCenterVO; @@ -107,6 +110,7 @@ import com.cloud.resource.ResourceManager; import com.cloud.resource.ResourceStateAdapter; import com.cloud.resource.ServerResource; import com.cloud.resource.UnableDeleteHostException; +import com.cloud.serializer.SerializerHelper; import com.cloud.service.ServiceOfferingVO; import com.cloud.service.dao.ServiceOfferingDao; import com.cloud.servlet.ConsoleProxyServlet; @@ -121,8 +125,10 @@ import com.cloud.storage.dao.VMTemplateHostDao; import com.cloud.template.TemplateManager; import com.cloud.user.Account; import com.cloud.user.AccountManager; +import com.cloud.user.AccountVO; import com.cloud.user.User; import com.cloud.user.UserContext; +import com.cloud.user.UserVO; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; @@ -143,6 +149,7 @@ import com.cloud.vm.NicProfile; import com.cloud.vm.ReservationContext; import com.cloud.vm.SystemVmLoadScanHandler; import com.cloud.vm.SystemVmLoadScanner; +import com.cloud.vm.VmWork; import com.cloud.vm.SystemVmLoadScanner.AfterScanAction; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; @@ -151,6 +158,7 @@ import com.cloud.vm.VirtualMachineGuru; import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.VirtualMachineName; import com.cloud.vm.VirtualMachineProfile; +import com.cloud.vm.VmWorkStart; import com.cloud.vm.dao.ConsoleProxyDao; import com.cloud.vm.dao.UserVmDetailsDao; import com.cloud.vm.dao.VMInstanceDao; @@ -234,6 +242,9 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy TemplateManager templateMgr; @Inject IPAddressDao _ipAddressDao; + + @Inject + EntityManager _entityMgr; private ConsoleProxyListener _listener; @@ -2028,4 +2039,27 @@ public class ConsoleProxyManagerImpl extends ManagerBase implements ConsoleProxy @Override public void prepareStop(VirtualMachineProfile profile) { } + + @Override + public void vmWorkStart(VmWork work) { + assert(work instanceof VmWorkStart); + + ConsoleProxyVO vm = findById(work.getVmId()); + + UserVO user = _entityMgr.findById(UserVO.class, work.getUserId()); + AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId()); + + try { + _itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(), + user, account, ((VmWorkStart)work).getPlan()); + } catch(Exception e) { + String result = SerializerHelper.toObjectSerializedString(e); + AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobResult.STATUS_FAILED, 0, result); + } + } + + @Override + public void vmWorkStop(VmWork work) { + // TODO + } } diff --git a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java index ab91059b0f3..7eff93866b7 100755 --- a/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java +++ b/server/src/com/cloud/network/router/VirtualNetworkApplianceManagerImpl.java @@ -228,6 +228,7 @@ import com.cloud.vm.ReservationContextImpl; import com.cloud.vm.UserVmVO; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; +import com.cloud.vm.VmWork; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.VirtualMachineGuru; import com.cloud.vm.VirtualMachineManager; @@ -3712,4 +3713,12 @@ public class VirtualNetworkApplianceManagerImpl extends ManagerBase implements V } } } + + @Override + public void vmWorkStart(VmWork work) { + } + + @Override + public void vmWorkStop(VmWork work) { + } } diff --git a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java index c94224b264c..170e4bf0d39 100755 --- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java +++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java @@ -129,6 +129,7 @@ import com.cloud.vm.SecondaryStorageVm; import com.cloud.vm.SecondaryStorageVmVO; import com.cloud.vm.SystemVmLoadScanHandler; import com.cloud.vm.SystemVmLoadScanner; +import com.cloud.vm.VmWork; import com.cloud.vm.SystemVmLoadScanner.AfterScanAction; import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine.State; @@ -1471,6 +1472,14 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar @Override public void prepareStop(VirtualMachineProfile profile) { - } + + @Override + public void vmWorkStart(VmWork work) { + } + + @Override + public void vmWorkStop(VmWork work) { + } + } diff --git a/server/src/com/cloud/vm/UserVmManagerImpl.java b/server/src/com/cloud/vm/UserVmManagerImpl.java index 646cf9285e3..1b00997e0bf 100755 --- a/server/src/com/cloud/vm/UserVmManagerImpl.java +++ b/server/src/com/cloud/vm/UserVmManagerImpl.java @@ -4056,5 +4056,13 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use @Override public void prepareStop(VirtualMachineProfile profile) { } + + @Override + public void vmWorkStart(VmWork work) { + } + + @Override + public void vmWorkStop(VmWork work) { + } } diff --git a/server/src/com/cloud/vm/VirtualMachineGuru.java b/server/src/com/cloud/vm/VirtualMachineGuru.java index a4f0f90cc9a..b3fafe386cf 100644 --- a/server/src/com/cloud/vm/VirtualMachineGuru.java +++ b/server/src/com/cloud/vm/VirtualMachineGuru.java @@ -116,5 +116,10 @@ public interface VirtualMachineGuru { */ void prepareStop(VirtualMachineProfile profile); - void doVmStart(VmWork work); + /** + * VM work handlers + * @param work + */ + void vmWorkStart(VmWork work); + void vmWorkStop(VmWork work); } diff --git a/server/src/com/cloud/vm/VirtualMachineManager.java b/server/src/com/cloud/vm/VirtualMachineManager.java index c98cc913dcd..1032abc58b0 100644 --- a/server/src/com/cloud/vm/VirtualMachineManager.java +++ b/server/src/com/cloud/vm/VirtualMachineManager.java @@ -94,6 +94,8 @@ public interface VirtualMachineManager extends Manager { Collection> getRegisteredGurus(); + VirtualMachineGuru getVmGuru(T vm); + boolean stateTransitTo(VMInstanceVO vm, VirtualMachine.Event e, Long hostId) throws NoTransitionException; T advanceStart(T vm, Map params, User caller, Account account) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException; @@ -201,4 +203,9 @@ public interface VirtualMachineManager extends Manager { throws ResourceUnavailableException, ConcurrentOperationException, ManagementServerException, VirtualMachineMigrationException; + // + // VM work handlers + // + T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException; } diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java index 773bd265e3a..555fc90bbd0 100755 --- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -39,7 +39,7 @@ import javax.naming.ConfigurationException; import com.cloud.capacity.CapacityManager; import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; import org.apache.cloudstack.framework.messagebus.MessageBus; -import org.apache.cloudstack.messagebus.SubjectConstants; +import org.apache.cloudstack.messagebus.TopicConstants; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import com.cloud.dc.*; @@ -55,6 +55,8 @@ import com.cloud.agent.manager.Commands; import com.cloud.agent.manager.allocator.HostAllocator; import com.cloud.alert.AlertManager; import com.cloud.api.ApiSerializerHelper; +import com.cloud.async.AsyncJob; +import com.cloud.async.AsyncJobExecutionContext; import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobResult; import com.cloud.cluster.ClusterManager; @@ -62,7 +64,6 @@ import com.cloud.configuration.Config; import com.cloud.configuration.ConfigurationManager; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.dc.DataCenter; -import com.cloud.dc.DataCenterVO; import com.cloud.dc.HostPodVO; import com.cloud.dc.dao.DataCenterDao; import com.cloud.dc.dao.HostPodDao; @@ -118,7 +119,6 @@ import com.cloud.storage.dao.VMTemplateDao; import com.cloud.storage.dao.VolumeDao; import com.cloud.user.Account; import com.cloud.user.AccountManager; -import com.cloud.user.AccountVO; import com.cloud.user.User; import com.cloud.user.dao.AccountDao; import com.cloud.user.dao.UserDao; @@ -143,9 +143,7 @@ import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.dao.NicDao; import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.VMInstanceDao; -import com.cloud.vm.snapshot.VMSnapshot; import com.cloud.vm.snapshot.VMSnapshotManager; -import com.cloud.vm.snapshot.VMSnapshotVO; import com.cloud.vm.snapshot.dao.VMSnapshotDao; import com.cloud.vm.dao.UserVmDetailsDao; @@ -177,8 +175,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac protected DomainDao _domainDao; @Inject protected ClusterManager _clusterMgr; + +/* @Inject protected ItWorkDao _workDao; +*/ + @Inject protected UserVmDao _userVmDao; @Inject @@ -278,6 +280,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return _vmGurus.values(); } } + + @Override + @SuppressWarnings("unchecked") + public VirtualMachineGuru getVmGuru(T vm) { + return (VirtualMachineGuru) _vmGurus.get(vm.getType()); + } @Override @DB @@ -357,11 +365,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return allocate(vm, template, serviceOffering, new Pair(serviceOffering, null), null, networks, null, plan, hyperType, owner); } - @SuppressWarnings("unchecked") - private VirtualMachineGuru getVmGuru(T vm) { - return (VirtualMachineGuru) _vmGurus.get(vm.getType()); - } - @Override public boolean expunge(T vm, User caller, Account account) throws ResourceUnavailableException { try { @@ -483,6 +486,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } +/* protected boolean checkWorkItems(VMInstanceVO vm, State state) throws ConcurrentOperationException { while (true) { ItWorkVO vo = _workDao.findByOutstandingWork(vm.getId(), state); @@ -513,9 +517,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } s_logger.debug("Waiting some more to make sure there's no activity on " + vm); } - } - +*/ + +/* @DB protected Ternary changeToStartState(VirtualMachineGuru vmGuru, T vm, User caller, Account account) throws ConcurrentOperationException { @@ -583,18 +588,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new ConcurrentOperationException("Unable to change the state of " + vm); } +*/ - protected boolean changeState(T vm, Event event, Long hostId, ItWorkVO work, Step step) throws NoTransitionException { + @DB + protected Ternary changeToStartState(VirtualMachineGuru vmGuru, T vm, User caller, Account account) + throws ConcurrentOperationException { + + long vmId = vm.getId(); + + Ternary result = null; + Transaction txn = Transaction.currentTxn(); + txn.start(); + try { + VmWorkJobVO work = this._workJobDao.findById(AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId()); + + Journal journal = new Journal.LogJournal("Creating " + vm, s_logger); + ReservationContextImpl context = new ReservationContextImpl(work.getUuid(), journal, caller, account); + if (stateTransitTo(vm, Event.StartRequested, null, work.getUuid())) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Successfully transitioned to start state for " + vm + " reservation id = " + work.getId()); + } + result = new Ternary(vmGuru.findById(vmId), context, work); + txn.commit(); + return result; + } + } catch (NoTransitionException e) { + s_logger.warn("Unable to transition into Starting state due to " + e.getMessage()); + } finally { + if(result == null) + txn.rollback(); + } + + throw new ConcurrentOperationException("Unable to change the state of " + vm); + } + + protected boolean changeState(T vm, Event event, Long hostId, VmWorkJobVO work, Step step) throws NoTransitionException { // FIXME: We should do this better. - Step previousStep = work.getStep(); - _workDao.updateStep(work, step); + VmWorkJobVO.Step previousStep = work.getStep(); + boolean result = false; try { result = stateTransitTo(vm, event, hostId); return result; } finally { if (!result) { - _workDao.updateStep(work, previousStep); + work.setStep(previousStep); + this._workJobDao.update(work.getId(), work); } } } @@ -606,7 +645,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } @DB - public T advanceStartNew(final T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) + public T advanceStart(final T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { VmWorkJobVO workJob = null; @@ -624,7 +663,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob = pendingWorkJobs.get(0); } else { workJob = new VmWorkJobVO(); - + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); workJob.setCmd(VmWorkConstants.VM_WORK_START); @@ -657,7 +696,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // TODO : will refactor to fully-asynchronized way in the future _jobMgr.waitAndCheck( - new String[] { SubjectConstants.VM_POWER_STATE, SubjectConstants.JOB_STATE }, + new String[] { TopicConstants.VM_POWER_STATE, TopicConstants.JOB_STATE }, 3000L, 600000L, new Predicate() { @Override @@ -677,20 +716,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return vm; } - public T doVmStart(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) + @Override + public T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { long vmId = vm.getId(); VirtualMachineGuru vmGuru = getVmGuru(vm); vm = vmGuru.findById(vm.getId()); - Ternary start = changeToStartState(vmGuru, vm, caller, account); - if (start == null) { - return vmGuru.findById(vmId); - } + + Ternary start = changeToStartState(vmGuru, vm, caller, account); + assert(start != null); - vm = start.first(); ReservationContext ctx = start.second(); - ItWorkVO work = start.third(); + VmWorkJobVO work = start.third(); T startedVm = null; ServiceOfferingVO offering = _offeringDao.findById(vm.getServiceOfferingId()); @@ -700,7 +738,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.debug("Trying to deploy VM, vm has dcId: " + vm.getDataCenterId() + " and podId: " + vm.getPodIdToDeployIn()); } DataCenterDeployment plan = new DataCenterDeployment(vm.getDataCenterId(), vm.getPodIdToDeployIn(), null, null, null, null, ctx); - if(planToDeploy != null && planToDeploy.getDataCenterId() != 0){ + if(planToDeploy != null && planToDeploy.getDataCenterId() != 0) { if (s_logger.isDebugEnabled()) { s_logger.debug("advanceStart: DeploymentPlan is provided, using dcId:" + planToDeploy.getDataCenterId() + ", podId: " + planToDeploy.getPodId() + ", clusterId: " + planToDeploy.getClusterId() + ", hostId: " + planToDeploy.getHostId() + ", poolId: " + planToDeploy.getPoolId()); @@ -712,8 +750,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac boolean canRetry = true; try { - Journal journal = start.second().getJournal(); - ExcludeList avoids = null; if (planToDeploy != null) { avoids = planToDeploy.getAvoids(); @@ -791,7 +827,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } if (dest != null) { avoids.addHost(dest.getHost().getId()); - journal.record("Deployment found ", vmProfile, dest); + AsyncJobExecutionContext.getCurrentExecutionContext().logJobJournal( + AsyncJob.JournalType.SUCCESS, "Deployment found, dest host: " + dest.getHost().getId(), null); break; } } @@ -846,17 +883,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac vmGuru.finalizeDeployment(cmds, vmProfile, dest, ctx); - - work = _workDao.findById(work.getId()); - if (work == null || work.getStep() != Step.Prepare) { - throw new ConcurrentOperationException("Work steps have been changed: " + work); - } - _workDao.updateStep(work, Step.Starting); + work.setStep(VmWorkJobVO.Step.Starting); + _workJobDao.update(work.getId(), work); _agentMgr.send(destHostId, cmds); - _workDao.updateStep(work, Step.Started); - + work.setStep(VmWorkJobVO.Step.Started); + _workJobDao.update(work.getId(), work); StartAnswer startAnswer = cmds.getAnswer(StartAnswer.class); if (startAnswer != null && startAnswer.getResult()) { @@ -925,9 +958,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac throw new AgentUnavailableException("Unable to start instance due to " + e.getMessage(), destHostId, e); } finally { if (startedVm == null && canRetry) { - Step prevStep = work.getStep(); - _workDao.updateStep(work, Step.Release); - if (prevStep == Step.Started || prevStep == Step.Starting) { + VmWorkJobVO.Step prevStep = work.getStep(); + _workJobDao.updateStep(work.getId(), VmWorkJobVO.Step.Release); + if (prevStep == VmWorkJobVO.Step.Started || prevStep == VmWorkJobVO.Step.Starting) { cleanup(vmGuru, vmProfile, work, Event.OperationFailed, false, caller, account); } else { //if step is not starting/started, send cleanup command with force=true @@ -951,7 +984,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return startedVm; } - + /* @Override public T advanceStart(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { @@ -1227,7 +1260,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return startedVm; } - +*/ @Override public boolean stop(T vm, User user, Account account) throws ResourceUnavailableException { try { @@ -1263,18 +1296,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return true; } - protected boolean cleanup(VirtualMachineGuru guru, VirtualMachineProfile profile, ItWorkVO work, Event event, boolean force, User user, Account account) { + protected boolean cleanup(VirtualMachineGuru guru, VirtualMachineProfile profile, VmWorkJobVO work, Event event, boolean force, User user, Account account) { T vm = profile.getVirtualMachine(); State state = vm.getState(); s_logger.debug("Cleaning up resources for the vm " + vm + " in " + state + " state"); if (state == State.Starting) { - Step step = work.getStep(); - if (step == Step.Starting && !force) { + VmWorkJobVO.Step step = work.getStep(); + if (step == VmWorkJobVO.Step.Starting && !force) { s_logger.warn("Unable to cleanup vm " + vm + "; work state is incorrect: " + step); return false; } - if (step == Step.Started || step == Step.Starting || step == Step.Release) { + if (step == VmWorkJobVO.Step.Started || step == VmWorkJobVO.Step.Starting || step == VmWorkJobVO.Step.Release) { if (vm.getHostId() != null) { if (!sendStop(guru, profile, force)) { s_logger.warn("Failed to stop vm " + vm + " in " + State.Starting + " state as a part of cleanup process"); @@ -1283,7 +1316,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } - if (step != Step.Release && step != Step.Prepare && step != Step.Started && step != Step.Starting) { + if (step != VmWorkJobVO.Step.Release && step != VmWorkJobVO.Step.Prepare && step != VmWorkJobVO.Step.Started && step != VmWorkJobVO.Step.Starting) { s_logger.debug("Cleanup is not needed for vm " + vm + "; work state is incorrect: " + step); return true; } @@ -1325,7 +1358,146 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.debug("Successfully cleanued up resources for the vm " + vm + " in " + state + " state"); return true; } + + @Override + public boolean advanceStop(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + // ??? + return true; + } + + public boolean processVmStopWork(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { + VmWorkJobVO work = _workJobDao.findById(AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId()); + Long hostId = vm.getHostId(); + if (hostId == null) { + if (!forced) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("HostId is null but this is not a forced stop, cannot stop vm " + vm + " with state:" + vm.getState()); + } + return false; + } + + try { + stateTransitTo(vm, Event.AgentReportStopped, null, null); + } catch (NoTransitionException e) { + s_logger.warn(e.getMessage()); + } + + _workJobDao.updateStep(work.getId(), VmWorkJobVO.Step.Done); + return true; + } + + VirtualMachineGuru vmGuru = getVmGuru(vm); + VirtualMachineProfile profile = new VirtualMachineProfileImpl(vm); + + try { + if (!stateTransitTo(vm, Event.StopRequested, vm.getHostId())) { + throw new ConcurrentOperationException("VM is being operated on."); + } + } catch (NoTransitionException e1) { + if (!forced) { + throw new CloudRuntimeException("We cannot stop " + vm + " when it is in state " + vm.getState()); + } + + State state = vm.getState(); + boolean doCleanup = false; + if (s_logger.isDebugEnabled()) { + s_logger.debug("Unable to transition the state but we're moving on because it's forced stop"); + } + if (state == State.Starting || state == State.Migrating) { + doCleanup = true; + } else if (state == State.Stopping) { + doCleanup = true; + } + + if (doCleanup) { + if (cleanup(vmGuru, new VirtualMachineProfileImpl(vm), work, Event.StopRequested, forced, user, account)) { + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Updating work item to Done, id:" + work.getId()); + } + return changeState(vm, Event.AgentReportStopped, null, work, Step.Done); + } catch (NoTransitionException e) { + s_logger.warn("Unable to cleanup " + vm); + return false; + } + } else { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Failed to cleanup VM: " + vm); + } + throw new CloudRuntimeException("Failed to cleanup " + vm + " , current state " + vm.getState()); + } + } + } + + vmGuru.prepareStop(profile); + + StopCommand stop = new StopCommand(vm); + boolean stopped = false; + StopAnswer answer = null; + try { + answer = (StopAnswer) _agentMgr.send(vm.getHostId(), stop); + stopped = answer.getResult(); + if (!stopped) { + throw new CloudRuntimeException("Unable to stop the virtual machine due to " + answer.getDetails()); + } + vmGuru.finalizeStop(profile, answer); + + } catch (AgentUnavailableException e) { + } catch (OperationTimedoutException e) { + } finally { + if (!stopped) { + if (!forced) { + s_logger.warn("Unable to stop vm " + vm); + try { + stateTransitTo(vm, Event.OperationFailed, vm.getHostId()); + } catch (NoTransitionException e) { + s_logger.warn("Unable to transition the state " + vm); + } + return false; + } else { + s_logger.warn("Unable to actually stop " + vm + " but continue with release because it's a force stop"); + vmGuru.finalizeStop(profile, answer); + } + } + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug(vm + " is stopped on the host. Proceeding to release resource held."); + } + + try { + _networkMgr.release(profile, forced); + s_logger.debug("Successfully released network resources for the vm " + vm); + } catch (Exception e) { + s_logger.warn("Unable to release some network resources.", e); + } + + try { + if (vm.getHypervisorType() != HypervisorType.BareMetal) { + this.volumeMgr.release(profile); + s_logger.debug("Successfully released storage resources for the vm " + vm); + } + } catch (Exception e) { + s_logger.warn("Unable to release storage resources.", e); + } + + try { + if (work != null) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Updating the outstanding work item to Done, id:" + work.getId()); + } + _workJobDao.updateStep(work.getId(), VmWorkJobVO.Step.Done); + } + + return stateTransitTo(vm, Event.OperationSucceeded, null, null); + } catch (NoTransitionException e) { + s_logger.warn(e.getMessage()); + return false; + } + } + +/* @Override public boolean advanceStop(T vm, boolean forced, User user, Account account) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException { State state = vm.getState(); @@ -1492,7 +1664,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return false; } } - +*/ private void setStateMachine() { _stateMachine = VirtualMachine.State.getStateMachine(); } @@ -1504,12 +1676,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public boolean stateTransitTo(VMInstanceVO vm, VirtualMachine.Event e, Long hostId) throws NoTransitionException { + +/* + * TODO ??? + * // if there are active vm snapshots task, state change is not allowed - if(_vmSnapshotMgr.hasActiveVMSnapshotTasks(vm.getId())){ + if(_vmSnapshotMgr.hasActiveVMSnapshotTasks(vm.getId())) { s_logger.error("State transit with event: " + e + " failed due to: " + vm.getInstanceName() + " has active VM snapshots tasks"); return false; } - +*/ State oldState = vm.getState(); if (oldState == State.Starting) { if (e == Event.OperationSucceeded) { @@ -1632,9 +1808,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public T migrate(T vm, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException, ManagementServerException, - VirtualMachineMigrationException { + VirtualMachineMigrationException { s_logger.info("Migrating " + vm + " to " + dest); - + + return vm; + +/* long dstHostId = dest.getHost().getId(); Host fromHost = _hostDao.findById(srcHostId); if (fromHost == null) { @@ -1779,6 +1958,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac work.setStep(Step.Done); _workDao.update(work.getId(), work); } +*/ } @Override @@ -1789,6 +1969,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } protected void cancelWorkItems(long nodeId) { +/* GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem"); try { @@ -1825,6 +2006,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } finally { scanLock.releaseRef(); } +*/ } @Override @@ -1911,14 +2093,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } protected class CleanupTask implements Runnable { + @Override public void run() { s_logger.trace("VM Operation Thread Running"); +/* try { _workDao.cleanup(_cleanupWait); } catch (Exception e) { s_logger.error("VM Operations failed due to ", e); } +*/ } } @@ -2120,6 +2305,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac */ public void fullSync(final long clusterId, Map> newStates) { +/* if (newStates==null)return; Map infos = convertToInfos(newStates); Set set_vms = Collections.synchronizedSet(new HashSet()); @@ -2224,13 +2410,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } } - /* else if(info == null && vm.getState() == State.Stopping) { //Handling CS-13376 - s_logger.warn("Marking the VM as Stopped as it was still stopping on the CS" +vm.getName()); - vm.setState(State.Stopped); // Setting the VM as stopped on the DB and clearing it from the host - vm.setLastHostId(vm.getHostId()); - vm.setHostId(null); - _vmDao.persist(vm); - }*/ } for (final AgentVmInfo left : infos.values()) { @@ -2248,7 +2427,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.warn("Unable to stop a VM due to " + e.getMessage()); } } - +*/ } @@ -2353,6 +2532,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac * */ protected Command compareState(long hostId, VMInstanceVO vm, final AgentVmInfo info, final boolean fullSync, boolean trackExternalChange) { + return null; +/* State agentState = info.state; final State serverState = vm.getState(); final String serverName = vm.getInstanceName(); @@ -2523,10 +2704,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } return command; +*/ } private void ensureVmRunningContext(long hostId, VMInstanceVO vm, Event cause) throws OperationTimedoutException, ResourceUnavailableException, NoTransitionException, InsufficientAddressCapacityException { - VirtualMachineGuru vmGuru = getVmGuru(vm); + /* + VirtualMachineGuru vmGuru = getVmGuru(vm); s_logger.debug("VM state is starting on full sync so updating it to running"); vm = findByIdAndType(vm.getType(), vm.getId()); @@ -2582,6 +2765,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac work.setStep(Step.Done); _workDao.update(work.getId(), work); } +*/ } @Override @@ -2623,6 +2807,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (cmd instanceof PingRoutingCommand) { PingRoutingCommand ping = (PingRoutingCommand) cmd; if (ping.getNewStates() != null && ping.getNewStates().size() > 0) { + _syncMgr.processHostVmStatePingReport(agentId, ping.getNewStates()); /* TODO Commands commands = deltaHostSync(agentId, ping.getNewStates()); @@ -2657,6 +2842,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return; } + if(s_logger.isDebugEnabled()) + s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId()); _syncMgr.resetHostSyncState(agent.getId()); } @@ -3126,7 +3313,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public T migrateForScale(T vm, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException, ManagementServerException, VirtualMachineMigrationException { s_logger.info("Migrating " + vm + " to " + dest); - + return vm; +/* Long newSvcOfferingId = vm.getServiceOfferingId(); long dstHostId = dest.getHost().getId(); Host fromHost = _hostDao.findById(srcHostId); @@ -3275,10 +3463,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac work.setStep(Step.Done); _workDao.update(work.getId(), work); } +*/ } @Override public VMInstanceVO reConfigureVm(VMInstanceVO vm , ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, ConcurrentOperationException { - +/* long newServiceofferingId = vm.getServiceOfferingId(); ServiceOffering newServiceOffering = _configMgr.getServiceOffering(newServiceofferingId); ScaleVmCommand reconfigureCmd = new ScaleVmCommand(vm.getInstanceName(), newServiceOffering.getCpu(), @@ -3319,10 +3508,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _capacityMgr.allocateVmCapacity(vm, false); // allocate the old capacity } } - +*/ return vm; } - - - } +} diff --git a/server/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/server/src/com/cloud/vm/VirtualMachinePowerStateSync.java index ac1ddec06f9..7a23ddd81e9 100644 --- a/server/src/com/cloud/vm/VirtualMachinePowerStateSync.java +++ b/server/src/com/cloud/vm/VirtualMachinePowerStateSync.java @@ -19,8 +19,14 @@ package com.cloud.vm; import java.util.Map; import com.cloud.agent.api.HostVmStateReportEntry; +import com.cloud.vm.VirtualMachine.PowerState; public interface VirtualMachinePowerStateSync { + void resetHostSyncState(long hostId); + void processHostVmStateReport(long hostId, Map report); + + // to adapt legacy ping report + void processHostVmStatePingReport(long hostId, Map report); } diff --git a/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java index f5bedd65076..5eb41173db5 100644 --- a/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java +++ b/server/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java @@ -24,10 +24,11 @@ import javax.inject.Inject; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; -import org.apache.cloudstack.messagebus.SubjectConstants; +import org.apache.cloudstack.messagebus.TopicConstants; import org.apache.log4j.Logger; import com.cloud.agent.api.HostVmStateReportEntry; +import com.cloud.vm.VirtualMachine.PowerState; import com.cloud.vm.dao.VMInstanceDao; public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync { @@ -48,44 +49,91 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat @Override public void processHostVmStateReport(long hostId, Map report) { - s_logger.info("Process host VM state report. host: " + hostId); + if(s_logger.isDebugEnabled()) + s_logger.debug("Process host VM state report from ping process. host: " + hostId); Map translatedInfo = convertToInfos(report); + processReport(hostId, translatedInfo); + } + + @Override + public void processHostVmStatePingReport(long hostId, Map report) { + if(s_logger.isDebugEnabled()) + s_logger.debug("Process host VM state report from ping process. host: " + hostId); + + Map translatedInfo = convertHostPingInfos(report); + processReport(hostId, translatedInfo); + } + + private void processReport(long hostId, Map translatedInfo) { for(Map.Entry entry : translatedInfo.entrySet()) { + + if(s_logger.isDebugEnabled()) + s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue()); + if(_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) { - _messageBus.publish(null, SubjectConstants.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey()); + + if(s_logger.isDebugEnabled()) + s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue()); + + _messageBus.publish(null, TopicConstants.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey()); } } } - - protected Map convertToInfos(Map states) { + + private Map convertHostPingInfos(Map states) { + final HashMap map = new HashMap(); + if (states == null) { + return map; + } + + for (Map.Entry entry : states.entrySet()) { + VMInstanceVO vm = findVM(entry.getKey()); + if(vm != null) { + map.put(vm.getId(), entry.getValue()); + break; + } else { + s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey()); + } + } + + return map; + } + + private Map convertToInfos(Map states) { final HashMap map = new HashMap(); if (states == null) { return map; } - Collection> vmGurus = _vmMgr.getRegisteredGurus(); - for (Map.Entry entry : states.entrySet()) { - for (VirtualMachineGuru vmGuru : vmGurus) { - String name = entry.getKey(); - VMInstanceVO vm = vmGuru.findByName(name); - if (vm != null) { - map.put(vm.getId(), entry.getValue().getState()); - break; - } - - Long id = vmGuru.convertToId(name); - if (id != null) { - vm = vmGuru.findById(id); - if(vm != null) { - map.put(id, entry.getValue().getState()); - break; - } - } - } + VMInstanceVO vm = findVM(entry.getKey()); + if(vm != null) { + map.put(vm.getId(), entry.getValue().getState()); + break; + } else { + s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey()); + } } return map; } + + private VMInstanceVO findVM(String vmName) { + Collection> vmGurus = _vmMgr.getRegisteredGurus(); + + for (VirtualMachineGuru vmGuru : vmGurus) { + VMInstanceVO vm = vmGuru.findByName(vmName); + if (vm != null) + return vm; + + Long id = vmGuru.convertToId(vmName); + if (id != null) { + vm = vmGuru.findById(id); + if(vm != null) + return vm; + } + } + return null; + } } diff --git a/server/src/com/cloud/vm/VmWorkConstants.java b/server/src/com/cloud/vm/VmWorkConstants.java index c435efe6737..44176ee74d8 100644 --- a/server/src/com/cloud/vm/VmWorkConstants.java +++ b/server/src/com/cloud/vm/VmWorkConstants.java @@ -22,6 +22,6 @@ public interface VmWorkConstants { public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; // work job commands - public static final String VM_WORK_START = "doVmStart"; - public static final String VM_WORK_STOP = "doVmStop"; + public static final String VM_WORK_START = "vmWorkStart"; + public static final String VM_WORK_STOP = "vmWorkStop"; } diff --git a/server/src/com/cloud/vm/VmWorkJobDispatcher.java b/server/src/com/cloud/vm/VmWorkJobDispatcher.java index 4ab4aa0368c..f033b1e2f7b 100644 --- a/server/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/server/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -33,6 +33,7 @@ import com.cloud.user.AccountVO; import com.cloud.user.UserContext; import com.cloud.user.dao.AccountDao; import com.cloud.utils.component.AdapterBase; +import com.cloud.vm.dao.VMInstanceDao; public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher { private static final Logger s_logger = Logger.getLogger(VmWorkJobDispatcher.class); @@ -40,6 +41,7 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch @Inject private VirtualMachineManager _vmMgr; @Inject private AsyncJobManager _asyncJobMgr; @Inject private AccountDao _accountDao; + @Inject private VMInstanceDao _instanceDao; private Map _handlerMap = new HashMap(); @@ -55,11 +57,23 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch AccountVO account = _accountDao.findById(work.getAccountId()); assert(account != null); + VMInstanceVO vm = _instanceDao.findById(work.getVmId()); + assert(vm != null); + + // + // Due to legcy massive generic usage in VirtualMachineManagerImpl, we can't dispatch job handling + // directly to VirtualMachineManagerImpl, since most handling method are generic method. + // + // to solve the problem, we have to go through an instantiated VirtualMachineGuru so that it can carry + // down correct type back to VirtualMachineManagerImpl. It is sad that we have to write code like this + // + VirtualMachineGuru guru = _vmMgr.getVmGuru(vm); + UserContext.registerContext(work.getUserId(), account, null, false); try { - Method handler = getHandler(cmd); + Method handler = getHandler(guru, cmd); if(handler != null) { - handler.invoke(_vmMgr, work); + handler.invoke(guru, work); _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_SUCCEEDED, 0, null); } else { _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, 0, null); @@ -72,14 +86,15 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch } } - private Method getHandler(String cmd) { + private Method getHandler(VirtualMachineGuru guru, String cmd) { synchronized(_handlerMap) { - Method method = _handlerMap.get(cmd); + Class clz = guru.getClass(); + String key = clz.getCanonicalName() + cmd; + Method method = _handlerMap.get(key); if(method != null) return method; - Class clz = _vmMgr.getClass(); try { method = clz.getMethod(cmd, VmWork.class); method.setAccessible(true); @@ -93,7 +108,7 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch return null; } - _handlerMap.put(cmd, method); + _handlerMap.put(key, method); return method; } } diff --git a/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java b/server/src/org/apache/cloudstack/messagebus/TopicConstants.java similarity index 96% rename from server/src/org/apache/cloudstack/messagebus/SubjectConstants.java rename to server/src/org/apache/cloudstack/messagebus/TopicConstants.java index 0b29b2bc5e3..d90f602cba2 100644 --- a/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java +++ b/server/src/org/apache/cloudstack/messagebus/TopicConstants.java @@ -16,7 +16,7 @@ // under the License. package org.apache.cloudstack.messagebus; -public interface SubjectConstants { +public interface TopicConstants { // VM power state messages on message bus public static final String VM_POWER_STATE = "vm.powerstate"; diff --git a/server/test/com/cloud/async/AsyncJobTestConfiguration.java b/server/test/com/cloud/async/AsyncJobTestConfiguration.java index 9baff592e14..97345ddc2a9 100644 --- a/server/test/com/cloud/async/AsyncJobTestConfiguration.java +++ b/server/test/com/cloud/async/AsyncJobTestConfiguration.java @@ -23,6 +23,8 @@ import org.springframework.context.annotation.Configuration; import com.cloud.api.ApiDispatcher; import com.cloud.async.dao.AsyncJobDao; import com.cloud.async.dao.AsyncJobDaoImpl; +import com.cloud.async.dao.AsyncJobJournalDao; +import com.cloud.async.dao.AsyncJobJournalDaoImpl; import com.cloud.async.dao.SyncQueueDao; import com.cloud.async.dao.SyncQueueDaoImpl; import com.cloud.async.dao.SyncQueueItemDao; @@ -98,4 +100,9 @@ public class AsyncJobTestConfiguration { public VirtualMachineManager virtualMachineManager() { return Mockito.mock(VirtualMachineManager.class); } + + @Bean + public AsyncJobJournalDao asyncJobJournalDao() { + return new AsyncJobJournalDaoImpl(); + } } diff --git a/server/test/com/cloud/async/MockVirtualMachineManagerImpl.java b/server/test/com/cloud/async/MockVirtualMachineManagerImpl.java index 4566e8b91c7..c016ddd1549 100644 --- a/server/test/com/cloud/async/MockVirtualMachineManagerImpl.java +++ b/server/test/com/cloud/async/MockVirtualMachineManagerImpl.java @@ -51,6 +51,7 @@ import com.cloud.vm.NicProfile; import com.cloud.vm.NicVO; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; +import com.cloud.vm.VmWork; import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.VirtualMachine.Type; import com.cloud.vm.VirtualMachineGuru; @@ -186,6 +187,11 @@ public class MockVirtualMachineManagerImpl implements VirtualMachineManager { } + public VirtualMachineGuru getVmGuru(T vm) { + // TODO Auto-generated method stub + return null; + } + @Override public Collection> getRegisteredGurus() { // TODO Auto-generated method stub @@ -383,5 +389,11 @@ public class MockVirtualMachineManagerImpl implements VirtualMachineManager { // TODO Auto-generated method stub return null; } - + + @Override + public T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + + return vm; + } } diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index 3a58cc7c0b9..1b78557662f 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -16,13 +16,18 @@ // under the License. package com.cloud.async; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + import javax.inject.Inject; import junit.framework.TestCase; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.PublishScope; -import org.apache.cloudstack.messagebus.SubjectConstants; +import org.apache.cloudstack.messagebus.TopicConstants; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,8 +35,10 @@ import org.mockito.Mockito; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import com.cloud.async.AsyncJobJournalVO; import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobMonitor; +import com.cloud.async.dao.AsyncJobJournalDao; import com.cloud.cluster.ClusterManager; import com.cloud.utils.Predicate; import com.cloud.utils.component.ComponentContext; @@ -45,19 +52,63 @@ public class TestAsyncJobManager extends TestCase { @Inject ClusterManager clusterMgr; @Inject MessageBus messageBus; @Inject AsyncJobMonitor jobMonitor; + @Inject AsyncJobJournalDao journalDao; @Before public void setUp() { ComponentContext.initComponentsLifeCycle(); Mockito.when(clusterMgr.getManagementNodeId()).thenReturn(1L); - Transaction.open("dummy"); + Transaction.open("dummy"); + + // drop constraint check in order to do single table test + Statement stat = null; + try { + stat = Transaction.currentTxn().getConnection().createStatement(); + stat.execute("SET foreign_key_checks = 0;"); + } catch (SQLException e) { + } finally { + if(stat != null) { + try { + stat.close(); + } catch (SQLException e) { + } + } + } } @After public void tearDown() { Transaction.currentTxn().close(); - } + } + + @Test + public void testJobJournal() { + AsyncJobJournalVO journal = new AsyncJobJournalVO(); + journal.setJobId(1L); + journal.setJournalType(AsyncJob.JournalType.SUCCESS); + journal.setJournalText("Journal record 1"); + + journalDao.persist(journal); + + AsyncJobJournalVO journal2 = new AsyncJobJournalVO(); + journal2.setJobId(1L); + journal2.setJournalType(AsyncJob.JournalType.SUCCESS); + journal2.setJournalText("Journal record 2"); + + journalDao.persist(journal2); + + List l = journalDao.getJobJournal(1L); + Assert.assertTrue(l.size() == 2); + journal = l.get(0); + Assert.assertTrue(journal.getJournalText().equals("Journal record 1")); + + journal2 = l.get(1); + Assert.assertTrue(journal2.getJournalText().equals("Journal record 2")); + + journalDao.expunge(journal.getId()); + journalDao.expunge(journal2.getId()); + } @Test public void testWaitAndCheck() { @@ -81,7 +132,7 @@ public class TestAsyncJobManager extends TestCase { asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() { public boolean checkCondition() { System.out.println("Check condition to exit"); - messageBus.publish(null, SubjectConstants.JOB_HEARTBEAT, PublishScope.LOCAL, 1L); + messageBus.publish(null, TopicConstants.JOB_HEARTBEAT, PublishScope.LOCAL, 1L); return false; } }); diff --git a/server/test/com/cloud/vm/MockUserVmManagerImpl.java b/server/test/com/cloud/vm/MockUserVmManagerImpl.java index dd8dd83df58..6e388c44f62 100644 --- a/server/test/com/cloud/vm/MockUserVmManagerImpl.java +++ b/server/test/com/cloud/vm/MockUserVmManagerImpl.java @@ -422,4 +422,12 @@ public class MockUserVmManagerImpl extends ManagerBase implements UserVmManager, // TODO Auto-generated method stub return null; } + + @Override + public void vmWorkStart(VmWork work) { + } + + @Override + public void vmWorkStop(VmWork work) { + } } diff --git a/server/test/com/cloud/vm/VirtualMachineManagerImplTest.java b/server/test/com/cloud/vm/VirtualMachineManagerImplTest.java index 4f882d00c0e..8e977889605 100644 --- a/server/test/com/cloud/vm/VirtualMachineManagerImplTest.java +++ b/server/test/com/cloud/vm/VirtualMachineManagerImplTest.java @@ -116,7 +116,9 @@ public class VirtualMachineManagerImplTest { _vmMgr._capacityMgr = _capacityMgr; _vmMgr._hostDao = _hostDao; _vmMgr._nodeId = 1L; +/* _vmMgr._workDao = _workDao; +*/ _vmMgr._agentMgr = _agentMgr; when(_vmMock.getId()).thenReturn(314l); diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java index 4216aff3860..5cef9c27de1 100644 --- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java +++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java @@ -30,6 +30,8 @@ import org.apache.cloudstack.framework.messagebus.PublishScope; import com.cloud.agent.api.to.NicTO; import com.cloud.agent.api.to.VirtualMachineTO; +import com.cloud.async.AsyncJob.JournalType; +import com.cloud.async.AsyncJobExecutionContext; import com.cloud.deploy.DeployDestination; import com.cloud.deploy.DeploymentPlan; import com.cloud.exception.AgentUnavailableException; @@ -190,6 +192,11 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage // TODO Auto-generated method stub return null; } + + public VirtualMachineGuru getVmGuru(T vm) { + // TODO Auto-generated method stub + return null; + } @Override public boolean stateTransitTo(VMInstanceVO vm, Event e, Long hostId) @@ -383,8 +390,10 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage return null; } - public Void doVmWorkStart(VmWork work) { - _msgBus.publish(null, "Done", PublishScope.GLOBAL, null); - return null; - } + @Override + public T processVmStartWork(T vm, Map params, User caller, Account account, DeploymentPlan planToDeploy) + throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { + + return vm; + } } diff --git a/server/test/com/cloud/vm/VmWorkTest.java b/server/test/com/cloud/vm/VmWorkTest.java index 2c76fe66161..75ba61206bb 100644 --- a/server/test/com/cloud/vm/VmWorkTest.java +++ b/server/test/com/cloud/vm/VmWorkTest.java @@ -40,6 +40,9 @@ import com.cloud.cluster.ClusterManager; import com.cloud.deploy.DataCenterDeployment; import com.cloud.deploy.DeploymentPlan; import com.cloud.deploy.DeploymentPlanner.ExcludeList; +import com.cloud.exception.InsufficientCapacityException; +import com.cloud.exception.InsufficientStorageCapacityException; +import com.cloud.serializer.SerializerHelper; import com.cloud.utils.LogUtils; import com.cloud.utils.Predicate; import com.cloud.utils.component.ComponentContext; @@ -137,4 +140,16 @@ public class VmWorkTest extends TestCase { } }); } + + @Test + public void testExceptionSerialization() { + InsufficientCapacityException exception = new InsufficientStorageCapacityException("foo", VmWorkJobVO.class, 1L); + + String encodedString = SerializerHelper.toObjectSerializedString(exception); + System.out.println(encodedString); + + exception = (InsufficientCapacityException)SerializerHelper.fromObjectSerializedString(encodedString); + Assert.assertTrue(exception.getScope() == VmWorkJobVO.class); + Assert.assertTrue(exception.getMessage().equals("foo")); + } } diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java index f56e7bea987..2496dbe7d8b 100644 --- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java +++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java @@ -25,6 +25,8 @@ import com.cloud.async.SyncQueueManager; import com.cloud.async.SyncQueueManagerImpl; import com.cloud.async.dao.AsyncJobDao; import com.cloud.async.dao.AsyncJobDaoImpl; +import com.cloud.async.dao.AsyncJobJournalDao; +import com.cloud.async.dao.AsyncJobJournalDaoImpl; import com.cloud.async.dao.SyncQueueDao; import com.cloud.async.dao.SyncQueueDaoImpl; import com.cloud.async.dao.SyncQueueItemDao; @@ -103,4 +105,9 @@ public class VmWorkTestConfiguration { public VmWorkJobDao vmworkJobDao() { return new VmWorkJobDaoImpl(); } + + @Bean + public AsyncJobJournalDao jobJournalDao() { + return new AsyncJobJournalDaoImpl(); + } } diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql index b0daaf52548..c615c4fb458 100644 --- a/setup/db/db/schema-410to420.sql +++ b/setup/db/db/schema-410to420.sql @@ -411,7 +411,6 @@ ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`; ALTER TABLE `cloud`.`async_job` DROP COLUMN `callback_type`; ALTER TABLE `cloud`.`async_job` DROP COLUMN `callback_address`; -ALTER TABLE `cloud`.`async_job` ADD COLUMN `parent_id` bigint; ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_type` VARCHAR(32); ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64); ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_executing_msid` bigint; @@ -433,3 +432,13 @@ CREATE TABLE `cloud`.`vm_work_job` ( INDEX `i_vm_work_job__step`(`step`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; +CREATE TABLE `cloud`.`async_job_journal` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `job_id` bigint unsigned NOT NULL, + `journal_type` varchar(32), + `journal_text` varchar(1024) COMMENT 'journal descriptive informaton', + `journal_obj` varchar(1024) COMMENT 'journal strutural information, JSON encoded object', + `created` datetime NOT NULL COMMENT 'date created', + PRIMARY KEY (`id`), + CONSTRAINT `fk_async_job_journal__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8;