From 2230c24ab3af20b5c43727dc7e5df6a58ffce2c8 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Fri, 1 Apr 2011 10:40:40 -0700 Subject: [PATCH] check in changes from 2.2.4 --- .../com/cloud/alert/ClusterAlertAdapter.java | 46 ++++--- ...askManager.java => CheckPointManager.java} | 12 +- ...erImpl.java => CheckPointManagerImpl.java} | 37 +++--- .../{TaskVO.java => CheckPointVO.java} | 14 ++- .../com/cloud/cluster/ClusterManagerImpl.java | 2 +- .../cloud/cluster/ManagementServerHostVO.java | 34 ++--- server/src/com/cloud/cluster/StackMaid.java | 4 +- .../com/cloud/cluster/dao/StackMaidDao.java | 12 +- .../cloud/cluster/dao/StackMaidDaoImpl.java | 40 +++--- .../DefaultComponentLibrary.java | 4 +- server/test/com/cloud/async/TestAsync.java | 8 +- ...erTest.java => CheckPointManagerTest.java} | 28 ++--- setup/db/create-schema.sql | 1 + setup/db/db/schema-222to224.sql | 8 +- setup/db/db/schema-snapshot-217to223.sql | 2 +- utils/src/com/cloud/utils/db/Transaction.java | 118 ++++++++++++------ .../cloud/utils/db/TransactionAttachment.java | 37 ++++++ 17 files changed, 247 insertions(+), 160 deletions(-) rename server/src/com/cloud/cluster/{TaskManager.java => CheckPointManager.java} (74%) rename server/src/com/cloud/cluster/{TaskManagerImpl.java => CheckPointManagerImpl.java} (83%) rename server/src/com/cloud/cluster/{TaskVO.java => CheckPointVO.java} (85%) rename server/test/com/cloud/cluster/{TaskManagerTest.java => CheckPointManagerTest.java} (93%) create mode 100644 utils/src/com/cloud/utils/db/TransactionAttachment.java diff --git a/server/src/com/cloud/alert/ClusterAlertAdapter.java b/server/src/com/cloud/alert/ClusterAlertAdapter.java index f9dde577a63..42722aa4022 100644 --- a/server/src/com/cloud/alert/ClusterAlertAdapter.java +++ b/server/src/com/cloud/alert/ClusterAlertAdapter.java @@ -7,8 +7,6 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; -import com.cloud.alert.AlertAdapter; -import com.cloud.alert.AlertManager; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterNodeJoinEventArgs; import com.cloud.cluster.ClusterNodeLeftEventArgs; @@ -30,8 +28,9 @@ public class ClusterAlertAdapter implements AlertAdapter { private ManagementServerHostDao _mshostDao; public void onClusterAlert(Object sender, EventArgs args) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Receive cluster alert, EventArgs: " + args.getClass().getName()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Receive cluster alert, EventArgs: " + args.getClass().getName()); + } if(args instanceof ClusterNodeJoinEventArgs) { onClusterNodeJoined(sender, (ClusterNodeJoinEventArgs)args); @@ -43,13 +42,15 @@ public class ClusterAlertAdapter implements AlertAdapter { } private void onClusterNodeJoined(Object sender, ClusterNodeJoinEventArgs args) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Handle cluster node join alert, self node: " + args.getSelf()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Handle cluster node join alert, self node: " + args.getSelf()); + } for(ManagementServerHostVO mshost : args.getJoinedNodes()) { - if(mshost.getId().longValue() == args.getSelf().longValue()) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Management server node " + mshost.getServiceIP() + " is up, send alert"); + if(mshost.getId() == args.getSelf().longValue()) { + if(s_logger.isDebugEnabled()) { + s_logger.debug("Management server node " + mshost.getServiceIP() + " is up, send alert"); + } _alertMgr.sendAlert(AlertManager.ALERT_TYPE_MANAGMENT_NODE, 0, new Long(0), "Management server node " + mshost.getServiceIP() + " is up", ""); @@ -60,11 +61,12 @@ public class ClusterAlertAdapter implements AlertAdapter { private void onClusterNodeLeft(Object sender, ClusterNodeLeftEventArgs args) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Handle cluster node left alert, self node: " + args.getSelf()); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Handle cluster node left alert, self node: " + args.getSelf()); + } for(ManagementServerHostVO mshost : args.getLeftNodes()) { - if(mshost.getId().longValue() != args.getSelf().longValue()) { + if(mshost.getId() != args.getSelf().longValue()) { GlobalLock lock = GlobalLock.getInternLock("ManagementAlert." + mshost.getId()); try { if(lock.lock(180)) { @@ -73,14 +75,16 @@ public class ClusterAlertAdapter implements AlertAdapter { if(alertHost.getAlertCount() == 0) { _mshostDao.increaseAlertCount(mshost.getId()); - if(s_logger.isDebugEnabled()) - s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, send alert"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, send alert"); + } _alertMgr.sendAlert(AlertManager.ALERT_TYPE_MANAGMENT_NODE, 0, new Long(0), "Management server node " + mshost.getServiceIP() + " is down", ""); } else { - if(s_logger.isDebugEnabled()) - s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, but alert has already been set"); + if(s_logger.isDebugEnabled()) { + s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, but alert has already been set"); + } } } finally { lock.unlock(); @@ -97,14 +101,16 @@ public class ClusterAlertAdapter implements AlertAdapter { public boolean configure(String name, Map params) throws ConfigurationException { - if (s_logger.isInfoEnabled()) - s_logger.info("Start configuring cluster alert manager : " + name); + if (s_logger.isInfoEnabled()) { + s_logger.info("Start configuring cluster alert manager : " + name); + } ComponentLocator locator = ComponentLocator.getCurrentLocator(); _mshostDao = locator.getDao(ManagementServerHostDao.class); - if(_mshostDao == null) - throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName()); + if(_mshostDao == null) { + throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName()); + } _alertMgr = locator.getManager(AlertManager.class); if (_alertMgr == null) { diff --git a/server/src/com/cloud/cluster/TaskManager.java b/server/src/com/cloud/cluster/CheckPointManager.java similarity index 74% rename from server/src/com/cloud/cluster/TaskManager.java rename to server/src/com/cloud/cluster/CheckPointManager.java index 9f17fbfb4b4..2f884af2ba5 100644 --- a/server/src/com/cloud/cluster/TaskManager.java +++ b/server/src/com/cloud/cluster/CheckPointManager.java @@ -1,6 +1,5 @@ package com.cloud.cluster; -import com.cloud.utils.component.Manager; /** * TaskManager helps business logic deal with clustering failover. @@ -14,23 +13,22 @@ import com.cloud.utils.component.Manager; * cleanup when the dead server resumes. * */ -public interface TaskManager extends Manager { +public interface CheckPointManager { /** * Adds a task with the context as to what the task is and the class * responsible for cleaning up. * * @param context context information to be stored. - * @param cleaner clazz responsible for cleanup if the process was interrupted. - * @return task id. + * @return Check point id. */ - long addTask(CleanupMaid context); + long pushCheckPoint(CleanupMaid context); /** * update the task with new context * @param taskId * @param updatedContext new updated context. */ - void updateTask(long taskId, CleanupMaid updatedContext); + void updateCheckPointState(long taskId, CleanupMaid updatedContext); /** @@ -38,5 +36,5 @@ public interface TaskManager extends Manager { * * @param taskId */ - void taskCompleted(long taskId); + void popCheckPoint(long taskId); } diff --git a/server/src/com/cloud/cluster/TaskManagerImpl.java b/server/src/com/cloud/cluster/CheckPointManagerImpl.java similarity index 83% rename from server/src/com/cloud/cluster/TaskManagerImpl.java rename to server/src/com/cloud/cluster/CheckPointManagerImpl.java index 9241670b37b..4d36958d053 100644 --- a/server/src/com/cloud/cluster/TaskManagerImpl.java +++ b/server/src/com/cloud/cluster/CheckPointManagerImpl.java @@ -21,13 +21,14 @@ import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.component.Inject; +import com.cloud.utils.component.Manager; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.GlobalLock; -@Local(value=TaskManager.class) -public class TaskManagerImpl implements TaskManager, ClusterManagerListener { - private static final Logger s_logger = Logger.getLogger(TaskManagerImpl.class); +@Local(value=CheckPointManager.class) +public class CheckPointManagerImpl implements CheckPointManager, Manager, ClusterManagerListener { + private static final Logger s_logger = Logger.getLogger(CheckPointManagerImpl.class); private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds private static final int GC_INTERVAL = 10000; // 10 seconds @@ -47,7 +48,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { private final ScheduledExecutorService _cleanupScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Task-Cleanup")); - protected TaskManagerImpl() { + protected CheckPointManagerImpl() { } @Override @@ -73,8 +74,8 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { return true; } - private void cleanupLeftovers(List l) { - for (TaskVO maid : l) { + private void cleanupLeftovers(List l) { + for (CheckPointVO maid : l) { if (StackMaid.doCleanup(maid)) { _maidDao.expunge(maid.getId()); } @@ -103,7 +104,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { public void reallyRun() { try { Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000); - List l = _maidDao.listLeftoversByCutTime(cutTime); + List l = _maidDao.listLeftoversByCutTime(cutTime); cleanupLeftovers(l); } catch (Throwable e) { s_logger.error("Unexpected exception when trying to execute queue item, ", e); @@ -152,24 +153,24 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { } @Override - public long addTask(CleanupMaid context) { - return _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context); + public long pushCheckPoint(CleanupMaid context) { + long seq = _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context); } @Override - public void updateTask(long taskId, CleanupMaid updatedContext) { - TaskVO task = _maidDao.createForUpdate(); + public void updateCheckPointState(long taskId, CleanupMaid updatedContext) { + CheckPointVO task = _maidDao.createForUpdate(); task.setDelegate(updatedContext.getClass().getName()); task.setContext(SerializerHelper.toSerializedStringOld(updatedContext)); _maidDao.update(taskId, task); } @Override - public void taskCompleted(long taskId) { + public void popCheckPoint(long taskId) { _maidDao.remove(taskId); } - protected boolean cleanup(TaskVO task) { + protected boolean cleanup(CheckPointVO task) { s_logger.info("Cleaning up " + task); CleanupMaid delegate = (CleanupMaid)SerializerHelper.fromSerializedString(task.getContext()); assert delegate.getClass().getName().equals(task.getDelegate()) : "Deserializer says " + delegate.getClass().getName() + " but it's suppose to be " + task.getDelegate(); @@ -181,7 +182,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { } else { s_logger.warn("Unsuccessful in cleaning up " + task + ". Procedure to cleanup manaully: " + delegate.getCleanupProcedure()); } - taskCompleted(task.getId()); + popCheckPoint(task.getId()); return true; } else { s_logger.error("Unable to cleanup " + task.getId()); @@ -197,11 +198,11 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { @Override public void run() { try { - List tasks = _maidDao.listCleanupTasks(_msId); + List tasks = _maidDao.listCleanupTasks(_msId); - List retries = new ArrayList(); + List retries = new ArrayList(); - for (TaskVO task : tasks) { + for (CheckPointVO task : tasks) { try { if (!cleanup(task)) { retries.add(task); @@ -216,7 +217,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener { if (_cleanupRetryInterval > 0) { _cleanupScheduler.schedule(this, _cleanupRetryInterval, TimeUnit.SECONDS); } else { - for (TaskVO task : retries) { + for (CheckPointVO task : retries) { s_logger.warn("Cleanup procedure for " + task + ": " + ((CleanupMaid)SerializerHelper.fromSerializedString(task.getContext())).getCleanupProcedure()); } } diff --git a/server/src/com/cloud/cluster/TaskVO.java b/server/src/com/cloud/cluster/CheckPointVO.java similarity index 85% rename from server/src/com/cloud/cluster/TaskVO.java rename to server/src/com/cloud/cluster/CheckPointVO.java index 1c870a2a6cc..33c80da889a 100644 --- a/server/src/com/cloud/cluster/TaskVO.java +++ b/server/src/com/cloud/cluster/CheckPointVO.java @@ -13,7 +13,7 @@ import com.cloud.utils.db.GenericDao; @Entity @Table(name="stack_maid") -public class TaskVO { +public class CheckPointVO { @Id @GeneratedValue(strategy=GenerationType.IDENTITY) @@ -27,7 +27,7 @@ public class TaskVO { private long threadId; @Column(name="seq") - private int seq; + private long seq; @Column(name="cleanup_delegate", length=128) private String delegate; @@ -38,7 +38,11 @@ public class TaskVO { @Column(name=GenericDao.CREATED_COLUMN) private Date created; - public TaskVO() { + public CheckPointVO() { + } + + public CheckPointVO(long seq) { + this.seq = seq; } public long getId() { @@ -61,11 +65,11 @@ public class TaskVO { this.threadId = threadId; } - public int getSeq() { + public long getSeq() { return seq; } - public void setSeq(int seq) { + public void setSeq(long seq) { this.seq = seq; } diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 39108de8f97..d573fa2fb20 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -670,7 +670,7 @@ public class ClusterManagerImpl implements ClusterManager { private static boolean isIdInList(Long id, List l) { for(ManagementServerHostVO mshost : l) { - if(mshost.getId() != null && mshost.getId() == id) { + if(mshost.getId() == id) { return true; } } diff --git a/server/src/com/cloud/cluster/ManagementServerHostVO.java b/server/src/com/cloud/cluster/ManagementServerHostVO.java index a7ec96edb2e..70f0eda6273 100644 --- a/server/src/com/cloud/cluster/ManagementServerHostVO.java +++ b/server/src/com/cloud/cluster/ManagementServerHostVO.java @@ -18,17 +18,17 @@ package com.cloud.cluster; -import java.util.Date; - -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; -import javax.persistence.Table; -import javax.persistence.Temporal; -import javax.persistence.TemporalType; - +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; + import com.cloud.utils.db.GenericDao; @Entity @@ -38,10 +38,10 @@ public class ManagementServerHostVO { @Id @GeneratedValue(strategy=GenerationType.IDENTITY) @Column(name="id") - private Long id; + private long id; @Column(name="msid", updatable=true, nullable=false) - private Long msid; + private long msid; @Column(name="name", updatable=true, nullable=true) private String name; @@ -75,19 +75,19 @@ public class ManagementServerHostVO { this.lastUpdateTime = updateTime; } - public Long getId() { + public long getId() { return id; } - public void setId(Long id) { + public void setId(long id) { this.id = id; } - public Long getMsid() { + public long getMsid() { return msid; } - public void setMsid(Long msid) { + public void setMsid(long msid) { this.msid = msid; } diff --git a/server/src/com/cloud/cluster/StackMaid.java b/server/src/com/cloud/cluster/StackMaid.java index 3f97b3282c0..0d6cacf344e 100644 --- a/server/src/com/cloud/cluster/StackMaid.java +++ b/server/src/com/cloud/cluster/StackMaid.java @@ -99,7 +99,7 @@ public class StackMaid { public void exitCleanup(long currentMsid) { if(currentSeq > 0) { - TaskVO maid = null; + CheckPointVO maid = null; while((maid = maidDao.popCleanupDelegate(currentMsid)) != null) { doCleanup(maid); } @@ -109,7 +109,7 @@ public class StackMaid { context.clear(); } - public static boolean doCleanup(TaskVO maid) { + public static boolean doCleanup(CheckPointVO maid) { if(maid.getDelegate() != null) { try { Class clz = Class.forName(maid.getDelegate()); diff --git a/server/src/com/cloud/cluster/dao/StackMaidDao.java b/server/src/com/cloud/cluster/dao/StackMaidDao.java index e79d7e23d72..742579b82fd 100644 --- a/server/src/com/cloud/cluster/dao/StackMaidDao.java +++ b/server/src/com/cloud/cluster/dao/StackMaidDao.java @@ -3,16 +3,16 @@ package com.cloud.cluster.dao; import java.util.Date; import java.util.List; -import com.cloud.cluster.TaskVO; +import com.cloud.cluster.CheckPointVO; import com.cloud.utils.db.GenericDao; -public interface StackMaidDao extends GenericDao { +public interface StackMaidDao extends GenericDao { public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context); - public TaskVO popCleanupDelegate(long msid); + public CheckPointVO popCleanupDelegate(long msid); public void clearStack(long msid); - public List listLeftoversByMsid(long msid); - public List listLeftoversByCutTime(Date cutTime); + public List listLeftoversByMsid(long msid); + public List listLeftoversByCutTime(Date cutTime); /** * Take over the task items of another management server and clean them up. @@ -26,5 +26,5 @@ public interface StackMaidDao extends GenericDao { */ boolean takeover(long takeOverMsid, long selfId); - List listCleanupTasks(long selfId); + List listCleanupTasks(long selfId); } diff --git a/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java b/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java index c430c1621a1..d9370676d12 100644 --- a/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/StackMaidDaoImpl.java @@ -12,7 +12,7 @@ import javax.ejb.Local; import org.apache.log4j.Logger; -import com.cloud.cluster.TaskVO; +import com.cloud.cluster.CheckPointVO; import com.cloud.serializer.SerializerHelper; import com.cloud.utils.DateUtil; import com.cloud.utils.db.DB; @@ -24,12 +24,12 @@ import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.Transaction; @Local(value = { StackMaidDao.class }) @DB(txn=false) -public class StackMaidDaoImpl extends GenericDaoBase implements StackMaidDao { +public class StackMaidDaoImpl extends GenericDaoBase implements StackMaidDao { private static final Logger s_logger = Logger.getLogger(StackMaidDaoImpl.class); - private SearchBuilder popSearch; - private SearchBuilder clearSearch; - private final SearchBuilder AllFieldsSearch; + private SearchBuilder popSearch; + private SearchBuilder clearSearch; + private final SearchBuilder AllFieldsSearch; public StackMaidDaoImpl() { popSearch = createSearchBuilder(); @@ -47,19 +47,19 @@ public class StackMaidDaoImpl extends GenericDaoBase implements St @Override public boolean takeover(long takeOverMsid, long selfId) { - TaskVO task = createForUpdate(); + CheckPointVO task = createForUpdate(); task.setMsid(selfId); task.setThreadId(0); - SearchCriteria sc = AllFieldsSearch.create(); + SearchCriteria sc = AllFieldsSearch.create(); sc.setParameters("msid", takeOverMsid); return update(task, sc) > 0; } @Override - public List listCleanupTasks(long msId) { - SearchCriteria sc = AllFieldsSearch.create(); + public List listCleanupTasks(long msId) { + SearchCriteria sc = AllFieldsSearch.create(); sc.setParameters("msid", msId); sc.setParameters("thread", 0); @@ -68,7 +68,7 @@ public class StackMaidDaoImpl extends GenericDaoBase implements St @Override public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context) { - TaskVO delegateItem = new TaskVO(); + CheckPointVO delegateItem = new CheckPointVO(); delegateItem.setMsid(msid); delegateItem.setThreadId(Thread.currentThread().getId()); delegateItem.setSeq(seq); @@ -81,13 +81,13 @@ public class StackMaidDaoImpl extends GenericDaoBase implements St } @Override - public TaskVO popCleanupDelegate(long msid) { - SearchCriteria sc = popSearch.create(); + public CheckPointVO popCleanupDelegate(long msid) { + SearchCriteria sc = popSearch.create(); sc.setParameters("msid", msid); sc.setParameters("threadId", Thread.currentThread().getId()); - Filter filter = new Filter(TaskVO.class, "seq", false, 0L, (long)1); - List l = listIncludingRemovedBy(sc, filter); + Filter filter = new Filter(CheckPointVO.class, "seq", false, 0L, (long)1); + List l = listIncludingRemovedBy(sc, filter); if(l != null && l.size() > 0) { expunge(l.get(0).getId()); return l.get(0); @@ -98,7 +98,7 @@ public class StackMaidDaoImpl extends GenericDaoBase implements St @Override public void clearStack(long msid) { - SearchCriteria sc = clearSearch.create(); + SearchCriteria sc = clearSearch.create(); sc.setParameters("msid", msid); expunge(sc); @@ -106,11 +106,11 @@ public class StackMaidDaoImpl extends GenericDaoBase implements St @Override @DB - public List listLeftoversByMsid(long msid) { - List l = new ArrayList(); + public List listLeftoversByMsid(long msid) { + List l = new ArrayList(); String sql = "select * from stack_maid where msid=? order by msid asc, thread_id asc, seq desc"; - Transaction txn = Transaction.open(Transaction.CLOUD_DB); + Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; try { pstmt = txn.prepareAutoCloseStatement(sql); @@ -132,9 +132,9 @@ public class StackMaidDaoImpl extends GenericDaoBase implements St @Override @DB - public List listLeftoversByCutTime(Date cutTime) { + public List listLeftoversByCutTime(Date cutTime) { - List l = new ArrayList(); + List l = new ArrayList(); String sql = "select * from stack_maid where created < ? order by msid asc, thread_id asc, seq desc"; Transaction txn = Transaction.open(Transaction.CLOUD_DB); diff --git a/server/src/com/cloud/configuration/DefaultComponentLibrary.java b/server/src/com/cloud/configuration/DefaultComponentLibrary.java index ac3dbfa8719..ecfd6199d23 100644 --- a/server/src/com/cloud/configuration/DefaultComponentLibrary.java +++ b/server/src/com/cloud/configuration/DefaultComponentLibrary.java @@ -39,7 +39,7 @@ import com.cloud.certificate.dao.CertificateDaoImpl; import com.cloud.cluster.ClusterManagerImpl; import com.cloud.cluster.DummyClusterManagerImpl; import com.cloud.cluster.ManagementServerNode; -import com.cloud.cluster.TaskManagerImpl; +import com.cloud.cluster.CheckPointManagerImpl; import com.cloud.cluster.dao.ManagementServerHostDaoImpl; import com.cloud.cluster.dao.StackMaidDaoImpl; import com.cloud.configuration.dao.ConfigurationDaoImpl; @@ -273,7 +273,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com } protected void populateManagers() { - addManager("StackMaidManager", TaskManagerImpl.class); + addManager("StackMaidManager", CheckPointManagerImpl.class); addManager("agent manager", AgentManagerImpl.class); addManager("account manager", AccountManagerImpl.class); addManager("configuration manager", ConfigurationManagerImpl.class); diff --git a/server/test/com/cloud/async/TestAsync.java b/server/test/com/cloud/async/TestAsync.java index 921b382f34c..6b3f42a9d6f 100644 --- a/server/test/com/cloud/async/TestAsync.java +++ b/server/test/com/cloud/async/TestAsync.java @@ -27,7 +27,7 @@ import junit.framework.Assert; import com.cloud.async.AsyncJobVO; import com.cloud.cluster.StackMaid; -import com.cloud.cluster.TaskVO; +import com.cloud.cluster.CheckPointVO; import com.cloud.cluster.dao.StackMaidDao; import com.cloud.cluster.dao.StackMaidDaoImpl; import com.cloud.serializer.Param; @@ -216,7 +216,7 @@ public class TestAsync extends Log4jEnabledTestCase { dao.pushCleanupDelegate(1L, 1, "delegate2", new Long(100)); dao.pushCleanupDelegate(1L, 2, "delegate3", null); - TaskVO item = dao.popCleanupDelegate(1L); + CheckPointVO item = dao.popCleanupDelegate(1L); Assert.assertTrue(item.getDelegate().equals("delegate3")); Assert.assertTrue(item.getContext() == null); @@ -283,8 +283,8 @@ public class TestAsync extends Log4jEnabledTestCase { Transaction txn = Transaction.open(Transaction.CLOUD_DB); StackMaidDao dao = new StackMaidDaoImpl(); - List l = dao.listLeftoversByMsid(1L); - for(TaskVO maid : l) { + List l = dao.listLeftoversByMsid(1L); + for(CheckPointVO maid : l) { s_logger.info("" + maid.getThreadId() + " " + maid.getDelegate() + " " + maid.getContext()); } diff --git a/server/test/com/cloud/cluster/TaskManagerTest.java b/server/test/com/cloud/cluster/CheckPointManagerTest.java similarity index 93% rename from server/test/com/cloud/cluster/TaskManagerTest.java rename to server/test/com/cloud/cluster/CheckPointManagerTest.java index f21c013b022..9692f04b423 100644 --- a/server/test/com/cloud/cluster/TaskManagerTest.java +++ b/server/test/com/cloud/cluster/CheckPointManagerTest.java @@ -52,8 +52,8 @@ import com.cloud.utils.component.MockComponentLocator; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; -public class TaskManagerTest extends TestCase { - private final static Logger s_logger = Logger.getLogger(TaskManagerTest.class); +public class CheckPointManagerTest extends TestCase { + private final static Logger s_logger = Logger.getLogger(CheckPointManagerTest.class); @Override @Before @@ -85,46 +85,46 @@ public class TaskManagerTest extends TestCase { public void testCompleteCase() throws Exception { ComponentLocator locator = ComponentLocator.getCurrentLocator(); - TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class); + CheckPointManagerImpl taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class); assertTrue(taskMgr.configure("TaskManager", new HashMap())); assertTrue(taskMgr.start()); MockMaid delegate = new MockMaid(); delegate.setValue("first"); - long taskId = taskMgr.addTask(delegate); + long taskId = taskMgr.pushCheckPoint(delegate); StackMaidDao maidDao = locator.getDao(StackMaidDao.class); - TaskVO task = maidDao.findById(taskId); + CheckPointVO task = maidDao.findById(taskId); assertEquals(task.getDelegate(), MockMaid.class.getName()); MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext()); assertEquals(retrieved.getValue(), delegate.getValue()); delegate.setValue("second"); - taskMgr.updateTask(taskId, delegate); + taskMgr.updateCheckPointState(taskId, delegate); task = maidDao.findById(taskId); assertEquals(task.getDelegate(), MockMaid.class.getName()); retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext()); assertEquals(retrieved.getValue(), delegate.getValue()); - taskMgr.taskCompleted(taskId); + taskMgr.popCheckPoint(taskId); assertNull(maidDao.findById(taskId)); } public void testSimulatedReboot() throws Exception { ComponentLocator locator = ComponentLocator.getCurrentLocator(); - TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class); + CheckPointManagerImpl taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class); assertTrue(taskMgr.configure("TaskManager", new HashMap())); assertTrue(taskMgr.start()); MockMaid maid = new MockMaid(); maid.setValue("first"); - long taskId = taskMgr.addTask(maid); + long taskId = taskMgr.pushCheckPoint(maid); StackMaidDao maidDao = locator.getDao(StackMaidDao.class); - TaskVO task = maidDao.findById(taskId); + CheckPointVO task = maidDao.findById(taskId); assertEquals(task.getDelegate(), MockMaid.class.getName()); MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext()); @@ -134,7 +134,7 @@ public class TaskManagerTest extends TestCase { assertNotNull(MockMaid.map.get(maid.getSeq())); - taskMgr = ComponentLocator.inject(TaskManagerImpl.class); + taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class); HashMap params = new HashMap(); params.put(Config.TaskCleanupRetryInterval.key(), "1"); taskMgr.configure("TaskManager", params); @@ -151,16 +151,16 @@ public class TaskManagerTest extends TestCase { public void testTakeover() throws Exception { ComponentLocator locator = ComponentLocator.getCurrentLocator(); - TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class); + CheckPointManagerImpl taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class); assertTrue(taskMgr.configure("TaskManager", new HashMap())); assertTrue(taskMgr.start()); MockMaid delegate = new MockMaid(); delegate.setValue("first"); - long taskId = taskMgr.addTask(delegate); + long taskId = taskMgr.pushCheckPoint(delegate); StackMaidDao maidDao = locator.getDao(StackMaidDao.class); - TaskVO task = maidDao.findById(taskId); + CheckPointVO task = maidDao.findById(taskId); assertEquals(task.getDelegate(), MockMaid.class.getName()); MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext()); diff --git a/setup/db/create-schema.sql b/setup/db/create-schema.sql index 05f05693fef..0a14f7a1df7 100755 --- a/setup/db/create-schema.sql +++ b/setup/db/create-schema.sql @@ -362,6 +362,7 @@ INSERT INTO `cloud`.`sequence` (name, value) VALUES ('private_mac_address_seq', INSERT INTO `cloud`.`sequence` (name, value) VALUES ('storage_pool_seq', 200); INSERT INTO `cloud`.`sequence` (name, value) VALUES ('volume_seq', 1); INSERT INTO `cloud`.`sequence` (name, value) VALUES ('networks_seq', 200); +INSERT INTO `cloud`.`sequence` (name, value) VALUES ('checkpoint_seq', 1); CREATE TABLE `cloud`.`volumes` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'Primary Key', diff --git a/setup/db/db/schema-222to224.sql b/setup/db/db/schema-222to224.sql index 5f391be48e7..c6a136c28b1 100644 --- a/setup/db/db/schema-222to224.sql +++ b/setup/db/db/schema-222to224.sql @@ -61,6 +61,7 @@ ALTER TABLE `cloud`.`host` ADD INDEX `i_host__allocation_state`(`allocation_stat ALTER TABLE `cloud`.`domain` ADD INDEX `i_domain__path`(`path`); +<<<<<<< HEAD INSERT INTO `cloud`.`configuration` VALUES ('Advanced','DEFAULT','management-server','control.cidr','169.254.0.0/16','Changes the cidr for the control network traffic. Defaults to using link local. Must be unique within pods'), ('Advanced','DEFAULT','management-server','control.gateway','169.254.0.1','gateway for the control network traffic'), @@ -101,8 +102,5 @@ INSERT INTO `cloud`.`configuration` VALUES ('Advanced','DEFAULT','management-server','vmware.service.console','Service Console','Specify the service console network name (ESX host only)'), ('Advanced','DEFAULT','AgentManager','xapiwait','600','Time (in seconds) to wait for XAPI to return'); - - - - - +INSERT INTO `cloud`.`sequence` (name, value) VALUES ('checkpoint_seq', 1); +DELETE FROM `cloud`.`sequence` WHERE name='snapshots_seq'; diff --git a/setup/db/db/schema-snapshot-217to223.sql b/setup/db/db/schema-snapshot-217to223.sql index d0bed4503d5..3a9680b50c2 100644 --- a/setup/db/db/schema-snapshot-217to223.sql +++ b/setup/db/db/schema-snapshot-217to223.sql @@ -9,4 +9,4 @@ UPDATE snapshots s, volumes v SET s.data_center_id=v.data_center_id, s.domain_id UPDATE snapshots s, snapshot_policy sp, snapshot_policy_ref spr SET s.hypervisor_type=sp.interval+3 WHERE s.id=spr.snap_id and spr.policy_id=sp.id; DROP table snapshot_policy_ref; -DELETE FROM snapshot_policy WHERE id=1; \ No newline at end of file +DELETE FROM snapshot_policy WHERE id=1; diff --git a/utils/src/com/cloud/utils/db/Transaction.java b/utils/src/com/cloud/utils/db/Transaction.java index 7dc31ebcaff..20d15918333 100755 --- a/utils/src/com/cloud/utils/db/Transaction.java +++ b/utils/src/com/cloud/utils/db/Transaction.java @@ -69,6 +69,7 @@ public class Transaction { private static final String CREATE_TXN = "create_txn"; private static final String CREATE_CONN = "create_conn"; private static final String STATEMENT = "statement"; + private static final String ATTACHMENT = "attachment"; public static final short CLOUD_DB = 0; public static final short USAGE_DB = 1; @@ -89,8 +90,7 @@ public class Transaction { Transaction txn = tls.get(); assert txn != null : "No Transaction on stack. Did you mark the method with @DB?"; - // loosen the requirement to let people use explicit transaction management (i.e., in Unit tests) - // assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private. What about @DB? hmmm... could that be it? " + txn.toString(); + assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private. What about @DB? hmmm... could that be it? " + txn.toString(); return txn; } @@ -168,6 +168,39 @@ public class Transaction { s_logger.warn("Unexpected exception: ", e); return null; } + } + + protected void attach(TransactionAttachment value) { + _stack.push(new StackElement(ATTACHMENT, value)); + } + + protected TransactionAttachment detach(String name) { + Iterator it = _stack.descendingIterator(); + while (it.hasNext()) { + StackElement element = it.next(); + if (element.type == ATTACHMENT) { + TransactionAttachment att = (TransactionAttachment)element.ref; + if (name.equals(att.getName())) { + it.remove(); + return att; + } + } + } + assert false : "Are you sure you attached this: " + name; + return null; + } + + public static void attachToTxn(TransactionAttachment value) { + Transaction txn = tls.get(); + assert txn != null && txn.peekInStack(CURRENT_TXN) != null: "Come on....how can we attach something to the transaction if you haven't started it?"; + + txn.attach(value); + } + + public static TransactionAttachment detachFromTxn(String name) { + Transaction txn = tls.get(); + assert txn != null : "No Transaction in TLS"; + return txn.detach(name); } protected static boolean checkAnnotation(int stack, Transaction txn) { @@ -484,7 +517,6 @@ public class Transaction { closeConnection(); _stack.clear(); - if (_lockMaster != null) { _lockMaster.clear(); } @@ -608,45 +640,55 @@ public class Transaction { it.remove(); - if (item.type == type && (ref == null || item.ref == ref)) { - break; - } - - if (item.type == CURRENT_TXN) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : "")); + try { + if (item.type == type && (ref == null || item.ref == ref)) { + break; } - } else if (item.type == CREATE_CONN) { - closeConnection(); - } else if (item.type == START_TXN) { - if (item.ref == null) { - rollback = true; - } else { + + if (item.type == CURRENT_TXN) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : "")); + } + } else if (item.type == CREATE_CONN) { + closeConnection(); + } else if (item.type == START_TXN) { + if (item.ref == null) { + rollback = true; + } else { + try { + _conn.rollback((Savepoint)ref); + rollback = false; + } catch (final SQLException e) { + s_logger.warn("Unable to rollback Txn.", e); + } + } + } else if (item.type == STATEMENT) { try { - _conn.rollback((Savepoint)ref); - rollback = false; - } catch (final SQLException e) { - s_logger.warn("Unable to rollback Txn.", e); - } - } - } else if (item.type == STATEMENT) { - try { - if (s_stmtLogger.isTraceEnabled()) { - s_stmtLogger.trace("Closing: " + ref.toString()); - } - Statement stmt = (Statement)ref; - try { - ResultSet rs = stmt.getResultSet(); - if (rs != null) { - rs.close(); + if (s_stmtLogger.isTraceEnabled()) { + s_stmtLogger.trace("Closing: " + ref.toString()); + } + Statement stmt = (Statement)ref; + try { + ResultSet rs = stmt.getResultSet(); + if (rs != null) { + rs.close(); + } + } catch(SQLException e) { + s_stmtLogger.trace("Unable to close resultset"); } - } catch(SQLException e) { - s_stmtLogger.trace("Unable to close resultset"); - } - stmt.close(); - } catch (final SQLException e) { - s_stmtLogger.trace("Unable to close statement: " + item.toString()); + stmt.close(); + } catch (final SQLException e) { + s_stmtLogger.trace("Unable to close statement: " + item.toString()); + } + } else if (item.type == ATTACHMENT) { + TransactionAttachment att = (TransactionAttachment)item.ref; + if (s_logger.isTraceEnabled()) { + s_logger.trace("Cleaning up " + att.getName()); + } + att.cleanup(); } + } catch(Exception e) { + s_logger.error("Unable to clean up " + item, e); } } diff --git a/utils/src/com/cloud/utils/db/TransactionAttachment.java b/utils/src/com/cloud/utils/db/TransactionAttachment.java new file mode 100644 index 00000000000..43f96101f07 --- /dev/null +++ b/utils/src/com/cloud/utils/db/TransactionAttachment.java @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.utils.db; + +/** + * TransactionAttachment are objects added to Transaction such that when + * the in memory transaction is closed, they are automatically closed. + * This is useful when the code needs to push something into TLS for a + * session but needs it to be cleanup when the session is done. + * + */ +public interface TransactionAttachment { + /** + * @return a unique name to be inserted. + */ + String getName(); + + /** + * cleanup() if it wasn't cleaned up before. + */ + void cleanup(); +}