check in changes from 2.2.4

This commit is contained in:
Alex Huang 2011-04-01 10:40:40 -07:00
parent 075fba5899
commit 2230c24ab3
17 changed files with 247 additions and 160 deletions

View File

@ -7,8 +7,6 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.alert.AlertAdapter;
import com.cloud.alert.AlertManager;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterNodeJoinEventArgs;
import com.cloud.cluster.ClusterNodeLeftEventArgs;
@ -30,8 +28,9 @@ public class ClusterAlertAdapter implements AlertAdapter {
private ManagementServerHostDao _mshostDao;
public void onClusterAlert(Object sender, EventArgs args) {
if(s_logger.isDebugEnabled())
s_logger.debug("Receive cluster alert, EventArgs: " + args.getClass().getName());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Receive cluster alert, EventArgs: " + args.getClass().getName());
}
if(args instanceof ClusterNodeJoinEventArgs) {
onClusterNodeJoined(sender, (ClusterNodeJoinEventArgs)args);
@ -43,13 +42,15 @@ public class ClusterAlertAdapter implements AlertAdapter {
}
private void onClusterNodeJoined(Object sender, ClusterNodeJoinEventArgs args) {
if(s_logger.isDebugEnabled())
s_logger.debug("Handle cluster node join alert, self node: " + args.getSelf());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Handle cluster node join alert, self node: " + args.getSelf());
}
for(ManagementServerHostVO mshost : args.getJoinedNodes()) {
if(mshost.getId().longValue() == args.getSelf().longValue()) {
if(s_logger.isDebugEnabled())
s_logger.debug("Management server node " + mshost.getServiceIP() + " is up, send alert");
if(mshost.getId() == args.getSelf().longValue()) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Management server node " + mshost.getServiceIP() + " is up, send alert");
}
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_MANAGMENT_NODE, 0, new Long(0),
"Management server node " + mshost.getServiceIP() + " is up", "");
@ -60,11 +61,12 @@ public class ClusterAlertAdapter implements AlertAdapter {
private void onClusterNodeLeft(Object sender, ClusterNodeLeftEventArgs args) {
if(s_logger.isDebugEnabled())
s_logger.debug("Handle cluster node left alert, self node: " + args.getSelf());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Handle cluster node left alert, self node: " + args.getSelf());
}
for(ManagementServerHostVO mshost : args.getLeftNodes()) {
if(mshost.getId().longValue() != args.getSelf().longValue()) {
if(mshost.getId() != args.getSelf().longValue()) {
GlobalLock lock = GlobalLock.getInternLock("ManagementAlert." + mshost.getId());
try {
if(lock.lock(180)) {
@ -73,14 +75,16 @@ public class ClusterAlertAdapter implements AlertAdapter {
if(alertHost.getAlertCount() == 0) {
_mshostDao.increaseAlertCount(mshost.getId());
if(s_logger.isDebugEnabled())
s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, send alert");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, send alert");
}
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_MANAGMENT_NODE, 0, new Long(0),
"Management server node " + mshost.getServiceIP() + " is down", "");
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, but alert has already been set");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Detected management server node " + mshost.getServiceIP() + " is down, but alert has already been set");
}
}
} finally {
lock.unlock();
@ -97,14 +101,16 @@ public class ClusterAlertAdapter implements AlertAdapter {
public boolean configure(String name, Map<String, Object> params)
throws ConfigurationException {
if (s_logger.isInfoEnabled())
s_logger.info("Start configuring cluster alert manager : " + name);
if (s_logger.isInfoEnabled()) {
s_logger.info("Start configuring cluster alert manager : " + name);
}
ComponentLocator locator = ComponentLocator.getCurrentLocator();
_mshostDao = locator.getDao(ManagementServerHostDao.class);
if(_mshostDao == null)
throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName());
if(_mshostDao == null) {
throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName());
}
_alertMgr = locator.getManager(AlertManager.class);
if (_alertMgr == null) {

View File

@ -1,6 +1,5 @@
package com.cloud.cluster;
import com.cloud.utils.component.Manager;
/**
* TaskManager helps business logic deal with clustering failover.
@ -14,23 +13,22 @@ import com.cloud.utils.component.Manager;
* cleanup when the dead server resumes.
*
*/
public interface TaskManager extends Manager {
public interface CheckPointManager {
/**
* Adds a task with the context as to what the task is and the class
* responsible for cleaning up.
*
* @param context context information to be stored.
* @param cleaner clazz responsible for cleanup if the process was interrupted.
* @return task id.
* @return Check point id.
*/
long addTask(CleanupMaid context);
long pushCheckPoint(CleanupMaid context);
/**
* update the task with new context
* @param taskId
* @param updatedContext new updated context.
*/
void updateTask(long taskId, CleanupMaid updatedContext);
void updateCheckPointState(long taskId, CleanupMaid updatedContext);
/**
@ -38,5 +36,5 @@ public interface TaskManager extends Manager {
*
* @param taskId
*/
void taskCompleted(long taskId);
void popCheckPoint(long taskId);
}

View File

@ -21,13 +21,14 @@ import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.Inject;
import com.cloud.utils.component.Manager;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
@Local(value=TaskManager.class)
public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
private static final Logger s_logger = Logger.getLogger(TaskManagerImpl.class);
@Local(value=CheckPointManager.class)
public class CheckPointManagerImpl implements CheckPointManager, Manager, ClusterManagerListener {
private static final Logger s_logger = Logger.getLogger(CheckPointManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private static final int GC_INTERVAL = 10000; // 10 seconds
@ -47,7 +48,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
private final ScheduledExecutorService _cleanupScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Task-Cleanup"));
protected TaskManagerImpl() {
protected CheckPointManagerImpl() {
}
@Override
@ -73,8 +74,8 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
return true;
}
private void cleanupLeftovers(List<TaskVO> l) {
for (TaskVO maid : l) {
private void cleanupLeftovers(List<CheckPointVO> l) {
for (CheckPointVO maid : l) {
if (StackMaid.doCleanup(maid)) {
_maidDao.expunge(maid.getId());
}
@ -103,7 +104,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
public void reallyRun() {
try {
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000);
List<TaskVO> l = _maidDao.listLeftoversByCutTime(cutTime);
List<CheckPointVO> l = _maidDao.listLeftoversByCutTime(cutTime);
cleanupLeftovers(l);
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
@ -152,24 +153,24 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
}
@Override
public long addTask(CleanupMaid context) {
return _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context);
public long pushCheckPoint(CleanupMaid context) {
long seq = _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context);
}
@Override
public void updateTask(long taskId, CleanupMaid updatedContext) {
TaskVO task = _maidDao.createForUpdate();
public void updateCheckPointState(long taskId, CleanupMaid updatedContext) {
CheckPointVO task = _maidDao.createForUpdate();
task.setDelegate(updatedContext.getClass().getName());
task.setContext(SerializerHelper.toSerializedStringOld(updatedContext));
_maidDao.update(taskId, task);
}
@Override
public void taskCompleted(long taskId) {
public void popCheckPoint(long taskId) {
_maidDao.remove(taskId);
}
protected boolean cleanup(TaskVO task) {
protected boolean cleanup(CheckPointVO task) {
s_logger.info("Cleaning up " + task);
CleanupMaid delegate = (CleanupMaid)SerializerHelper.fromSerializedString(task.getContext());
assert delegate.getClass().getName().equals(task.getDelegate()) : "Deserializer says " + delegate.getClass().getName() + " but it's suppose to be " + task.getDelegate();
@ -181,7 +182,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
} else {
s_logger.warn("Unsuccessful in cleaning up " + task + ". Procedure to cleanup manaully: " + delegate.getCleanupProcedure());
}
taskCompleted(task.getId());
popCheckPoint(task.getId());
return true;
} else {
s_logger.error("Unable to cleanup " + task.getId());
@ -197,11 +198,11 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
@Override
public void run() {
try {
List<TaskVO> tasks = _maidDao.listCleanupTasks(_msId);
List<CheckPointVO> tasks = _maidDao.listCleanupTasks(_msId);
List<TaskVO> retries = new ArrayList<TaskVO>();
List<CheckPointVO> retries = new ArrayList<CheckPointVO>();
for (TaskVO task : tasks) {
for (CheckPointVO task : tasks) {
try {
if (!cleanup(task)) {
retries.add(task);
@ -216,7 +217,7 @@ public class TaskManagerImpl implements TaskManager, ClusterManagerListener {
if (_cleanupRetryInterval > 0) {
_cleanupScheduler.schedule(this, _cleanupRetryInterval, TimeUnit.SECONDS);
} else {
for (TaskVO task : retries) {
for (CheckPointVO task : retries) {
s_logger.warn("Cleanup procedure for " + task + ": " + ((CleanupMaid)SerializerHelper.fromSerializedString(task.getContext())).getCleanupProcedure());
}
}

View File

@ -13,7 +13,7 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="stack_maid")
public class TaskVO {
public class CheckPointVO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@ -27,7 +27,7 @@ public class TaskVO {
private long threadId;
@Column(name="seq")
private int seq;
private long seq;
@Column(name="cleanup_delegate", length=128)
private String delegate;
@ -38,7 +38,11 @@ public class TaskVO {
@Column(name=GenericDao.CREATED_COLUMN)
private Date created;
public TaskVO() {
public CheckPointVO() {
}
public CheckPointVO(long seq) {
this.seq = seq;
}
public long getId() {
@ -61,11 +65,11 @@ public class TaskVO {
this.threadId = threadId;
}
public int getSeq() {
public long getSeq() {
return seq;
}
public void setSeq(int seq) {
public void setSeq(long seq) {
this.seq = seq;
}

View File

@ -670,7 +670,7 @@ public class ClusterManagerImpl implements ClusterManager {
private static boolean isIdInList(Long id, List<ManagementServerHostVO> l) {
for(ManagementServerHostVO mshost : l) {
if(mshost.getId() != null && mshost.getId() == id) {
if(mshost.getId() == id) {
return true;
}
}

View File

@ -18,17 +18,17 @@
package com.cloud.cluster;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import com.cloud.utils.db.GenericDao;
@Entity
@ -38,10 +38,10 @@ public class ManagementServerHostVO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@Column(name="id")
private Long id;
private long id;
@Column(name="msid", updatable=true, nullable=false)
private Long msid;
private long msid;
@Column(name="name", updatable=true, nullable=true)
private String name;
@ -75,19 +75,19 @@ public class ManagementServerHostVO {
this.lastUpdateTime = updateTime;
}
public Long getId() {
public long getId() {
return id;
}
public void setId(Long id) {
public void setId(long id) {
this.id = id;
}
public Long getMsid() {
public long getMsid() {
return msid;
}
public void setMsid(Long msid) {
public void setMsid(long msid) {
this.msid = msid;
}

View File

@ -99,7 +99,7 @@ public class StackMaid {
public void exitCleanup(long currentMsid) {
if(currentSeq > 0) {
TaskVO maid = null;
CheckPointVO maid = null;
while((maid = maidDao.popCleanupDelegate(currentMsid)) != null) {
doCleanup(maid);
}
@ -109,7 +109,7 @@ public class StackMaid {
context.clear();
}
public static boolean doCleanup(TaskVO maid) {
public static boolean doCleanup(CheckPointVO maid) {
if(maid.getDelegate() != null) {
try {
Class<?> clz = Class.forName(maid.getDelegate());

View File

@ -3,16 +3,16 @@ package com.cloud.cluster.dao;
import java.util.Date;
import java.util.List;
import com.cloud.cluster.TaskVO;
import com.cloud.cluster.CheckPointVO;
import com.cloud.utils.db.GenericDao;
public interface StackMaidDao extends GenericDao<TaskVO, Long> {
public interface StackMaidDao extends GenericDao<CheckPointVO, Long> {
public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context);
public TaskVO popCleanupDelegate(long msid);
public CheckPointVO popCleanupDelegate(long msid);
public void clearStack(long msid);
public List<TaskVO> listLeftoversByMsid(long msid);
public List<TaskVO> listLeftoversByCutTime(Date cutTime);
public List<CheckPointVO> listLeftoversByMsid(long msid);
public List<CheckPointVO> listLeftoversByCutTime(Date cutTime);
/**
* Take over the task items of another management server and clean them up.
@ -26,5 +26,5 @@ public interface StackMaidDao extends GenericDao<TaskVO, Long> {
*/
boolean takeover(long takeOverMsid, long selfId);
List<TaskVO> listCleanupTasks(long selfId);
List<CheckPointVO> listCleanupTasks(long selfId);
}

View File

@ -12,7 +12,7 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.cluster.TaskVO;
import com.cloud.cluster.CheckPointVO;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
@ -24,12 +24,12 @@ import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
@Local(value = { StackMaidDao.class }) @DB(txn=false)
public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements StackMaidDao {
public class StackMaidDaoImpl extends GenericDaoBase<CheckPointVO, Long> implements StackMaidDao {
private static final Logger s_logger = Logger.getLogger(StackMaidDaoImpl.class);
private SearchBuilder<TaskVO> popSearch;
private SearchBuilder<TaskVO> clearSearch;
private final SearchBuilder<TaskVO> AllFieldsSearch;
private SearchBuilder<CheckPointVO> popSearch;
private SearchBuilder<CheckPointVO> clearSearch;
private final SearchBuilder<CheckPointVO> AllFieldsSearch;
public StackMaidDaoImpl() {
popSearch = createSearchBuilder();
@ -47,19 +47,19 @@ public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements St
@Override
public boolean takeover(long takeOverMsid, long selfId) {
TaskVO task = createForUpdate();
CheckPointVO task = createForUpdate();
task.setMsid(selfId);
task.setThreadId(0);
SearchCriteria<TaskVO> sc = AllFieldsSearch.create();
SearchCriteria<CheckPointVO> sc = AllFieldsSearch.create();
sc.setParameters("msid", takeOverMsid);
return update(task, sc) > 0;
}
@Override
public List<TaskVO> listCleanupTasks(long msId) {
SearchCriteria<TaskVO> sc = AllFieldsSearch.create();
public List<CheckPointVO> listCleanupTasks(long msId) {
SearchCriteria<CheckPointVO> sc = AllFieldsSearch.create();
sc.setParameters("msid", msId);
sc.setParameters("thread", 0);
@ -68,7 +68,7 @@ public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements St
@Override
public long pushCleanupDelegate(long msid, int seq, String delegateClzName, Object context) {
TaskVO delegateItem = new TaskVO();
CheckPointVO delegateItem = new CheckPointVO();
delegateItem.setMsid(msid);
delegateItem.setThreadId(Thread.currentThread().getId());
delegateItem.setSeq(seq);
@ -81,13 +81,13 @@ public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements St
}
@Override
public TaskVO popCleanupDelegate(long msid) {
SearchCriteria<TaskVO> sc = popSearch.create();
public CheckPointVO popCleanupDelegate(long msid) {
SearchCriteria<CheckPointVO> sc = popSearch.create();
sc.setParameters("msid", msid);
sc.setParameters("threadId", Thread.currentThread().getId());
Filter filter = new Filter(TaskVO.class, "seq", false, 0L, (long)1);
List<TaskVO> l = listIncludingRemovedBy(sc, filter);
Filter filter = new Filter(CheckPointVO.class, "seq", false, 0L, (long)1);
List<CheckPointVO> l = listIncludingRemovedBy(sc, filter);
if(l != null && l.size() > 0) {
expunge(l.get(0).getId());
return l.get(0);
@ -98,7 +98,7 @@ public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements St
@Override
public void clearStack(long msid) {
SearchCriteria<TaskVO> sc = clearSearch.create();
SearchCriteria<CheckPointVO> sc = clearSearch.create();
sc.setParameters("msid", msid);
expunge(sc);
@ -106,11 +106,11 @@ public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements St
@Override
@DB
public List<TaskVO> listLeftoversByMsid(long msid) {
List<TaskVO> l = new ArrayList<TaskVO>();
public List<CheckPointVO> listLeftoversByMsid(long msid) {
List<CheckPointVO> l = new ArrayList<CheckPointVO>();
String sql = "select * from stack_maid where msid=? order by msid asc, thread_id asc, seq desc";
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
@ -132,9 +132,9 @@ public class StackMaidDaoImpl extends GenericDaoBase<TaskVO, Long> implements St
@Override
@DB
public List<TaskVO> listLeftoversByCutTime(Date cutTime) {
public List<CheckPointVO> listLeftoversByCutTime(Date cutTime) {
List<TaskVO> l = new ArrayList<TaskVO>();
List<CheckPointVO> l = new ArrayList<CheckPointVO>();
String sql = "select * from stack_maid where created < ? order by msid asc, thread_id asc, seq desc";
Transaction txn = Transaction.open(Transaction.CLOUD_DB);

View File

@ -39,7 +39,7 @@ import com.cloud.certificate.dao.CertificateDaoImpl;
import com.cloud.cluster.ClusterManagerImpl;
import com.cloud.cluster.DummyClusterManagerImpl;
import com.cloud.cluster.ManagementServerNode;
import com.cloud.cluster.TaskManagerImpl;
import com.cloud.cluster.CheckPointManagerImpl;
import com.cloud.cluster.dao.ManagementServerHostDaoImpl;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.configuration.dao.ConfigurationDaoImpl;
@ -273,7 +273,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
}
protected void populateManagers() {
addManager("StackMaidManager", TaskManagerImpl.class);
addManager("StackMaidManager", CheckPointManagerImpl.class);
addManager("agent manager", AgentManagerImpl.class);
addManager("account manager", AccountManagerImpl.class);
addManager("configuration manager", ConfigurationManagerImpl.class);

View File

@ -27,7 +27,7 @@ import junit.framework.Assert;
import com.cloud.async.AsyncJobVO;
import com.cloud.cluster.StackMaid;
import com.cloud.cluster.TaskVO;
import com.cloud.cluster.CheckPointVO;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.serializer.Param;
@ -216,7 +216,7 @@ public class TestAsync extends Log4jEnabledTestCase {
dao.pushCleanupDelegate(1L, 1, "delegate2", new Long(100));
dao.pushCleanupDelegate(1L, 2, "delegate3", null);
TaskVO item = dao.popCleanupDelegate(1L);
CheckPointVO item = dao.popCleanupDelegate(1L);
Assert.assertTrue(item.getDelegate().equals("delegate3"));
Assert.assertTrue(item.getContext() == null);
@ -283,8 +283,8 @@ public class TestAsync extends Log4jEnabledTestCase {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
StackMaidDao dao = new StackMaidDaoImpl();
List<TaskVO> l = dao.listLeftoversByMsid(1L);
for(TaskVO maid : l) {
List<CheckPointVO> l = dao.listLeftoversByMsid(1L);
for(CheckPointVO maid : l) {
s_logger.info("" + maid.getThreadId() + " " + maid.getDelegate() + " " + maid.getContext());
}

View File

@ -52,8 +52,8 @@ import com.cloud.utils.component.MockComponentLocator;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
public class TaskManagerTest extends TestCase {
private final static Logger s_logger = Logger.getLogger(TaskManagerTest.class);
public class CheckPointManagerTest extends TestCase {
private final static Logger s_logger = Logger.getLogger(CheckPointManagerTest.class);
@Override
@Before
@ -85,46 +85,46 @@ public class TaskManagerTest extends TestCase {
public void testCompleteCase() throws Exception {
ComponentLocator locator = ComponentLocator.getCurrentLocator();
TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
CheckPointManagerImpl taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class);
assertTrue(taskMgr.configure("TaskManager", new HashMap<String, Object>()));
assertTrue(taskMgr.start());
MockMaid delegate = new MockMaid();
delegate.setValue("first");
long taskId = taskMgr.addTask(delegate);
long taskId = taskMgr.pushCheckPoint(delegate);
StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
TaskVO task = maidDao.findById(taskId);
CheckPointVO task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
assertEquals(retrieved.getValue(), delegate.getValue());
delegate.setValue("second");
taskMgr.updateTask(taskId, delegate);
taskMgr.updateCheckPointState(taskId, delegate);
task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
assertEquals(retrieved.getValue(), delegate.getValue());
taskMgr.taskCompleted(taskId);
taskMgr.popCheckPoint(taskId);
assertNull(maidDao.findById(taskId));
}
public void testSimulatedReboot() throws Exception {
ComponentLocator locator = ComponentLocator.getCurrentLocator();
TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
CheckPointManagerImpl taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class);
assertTrue(taskMgr.configure("TaskManager", new HashMap<String, Object>()));
assertTrue(taskMgr.start());
MockMaid maid = new MockMaid();
maid.setValue("first");
long taskId = taskMgr.addTask(maid);
long taskId = taskMgr.pushCheckPoint(maid);
StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
TaskVO task = maidDao.findById(taskId);
CheckPointVO task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());
@ -134,7 +134,7 @@ public class TaskManagerTest extends TestCase {
assertNotNull(MockMaid.map.get(maid.getSeq()));
taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class);
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(Config.TaskCleanupRetryInterval.key(), "1");
taskMgr.configure("TaskManager", params);
@ -151,16 +151,16 @@ public class TaskManagerTest extends TestCase {
public void testTakeover() throws Exception {
ComponentLocator locator = ComponentLocator.getCurrentLocator();
TaskManagerImpl taskMgr = ComponentLocator.inject(TaskManagerImpl.class);
CheckPointManagerImpl taskMgr = ComponentLocator.inject(CheckPointManagerImpl.class);
assertTrue(taskMgr.configure("TaskManager", new HashMap<String, Object>()));
assertTrue(taskMgr.start());
MockMaid delegate = new MockMaid();
delegate.setValue("first");
long taskId = taskMgr.addTask(delegate);
long taskId = taskMgr.pushCheckPoint(delegate);
StackMaidDao maidDao = locator.getDao(StackMaidDao.class);
TaskVO task = maidDao.findById(taskId);
CheckPointVO task = maidDao.findById(taskId);
assertEquals(task.getDelegate(), MockMaid.class.getName());
MockMaid retrieved = (MockMaid)SerializerHelper.fromSerializedString(task.getContext());

View File

@ -362,6 +362,7 @@ INSERT INTO `cloud`.`sequence` (name, value) VALUES ('private_mac_address_seq',
INSERT INTO `cloud`.`sequence` (name, value) VALUES ('storage_pool_seq', 200);
INSERT INTO `cloud`.`sequence` (name, value) VALUES ('volume_seq', 1);
INSERT INTO `cloud`.`sequence` (name, value) VALUES ('networks_seq', 200);
INSERT INTO `cloud`.`sequence` (name, value) VALUES ('checkpoint_seq', 1);
CREATE TABLE `cloud`.`volumes` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'Primary Key',

View File

@ -61,6 +61,7 @@ ALTER TABLE `cloud`.`host` ADD INDEX `i_host__allocation_state`(`allocation_stat
ALTER TABLE `cloud`.`domain` ADD INDEX `i_domain__path`(`path`);
<<<<<<< HEAD
INSERT INTO `cloud`.`configuration` VALUES
('Advanced','DEFAULT','management-server','control.cidr','169.254.0.0/16','Changes the cidr for the control network traffic. Defaults to using link local. Must be unique within pods'),
('Advanced','DEFAULT','management-server','control.gateway','169.254.0.1','gateway for the control network traffic'),
@ -101,8 +102,5 @@ INSERT INTO `cloud`.`configuration` VALUES
('Advanced','DEFAULT','management-server','vmware.service.console','Service Console','Specify the service console network name (ESX host only)'),
('Advanced','DEFAULT','AgentManager','xapiwait','600','Time (in seconds) to wait for XAPI to return');
INSERT INTO `cloud`.`sequence` (name, value) VALUES ('checkpoint_seq', 1);
DELETE FROM `cloud`.`sequence` WHERE name='snapshots_seq';

View File

@ -9,4 +9,4 @@ UPDATE snapshots s, volumes v SET s.data_center_id=v.data_center_id, s.domain_id
UPDATE snapshots s, snapshot_policy sp, snapshot_policy_ref spr SET s.hypervisor_type=sp.interval+3 WHERE s.id=spr.snap_id and spr.policy_id=sp.id;
DROP table snapshot_policy_ref;
DELETE FROM snapshot_policy WHERE id=1;
DELETE FROM snapshot_policy WHERE id=1;

View File

@ -69,6 +69,7 @@ public class Transaction {
private static final String CREATE_TXN = "create_txn";
private static final String CREATE_CONN = "create_conn";
private static final String STATEMENT = "statement";
private static final String ATTACHMENT = "attachment";
public static final short CLOUD_DB = 0;
public static final short USAGE_DB = 1;
@ -89,8 +90,7 @@ public class Transaction {
Transaction txn = tls.get();
assert txn != null : "No Transaction on stack. Did you mark the method with @DB?";
// loosen the requirement to let people use explicit transaction management (i.e., in Unit tests)
// assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private. What about @DB? hmmm... could that be it? " + txn.toString();
assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private. What about @DB? hmmm... could that be it? " + txn.toString();
return txn;
}
@ -168,6 +168,39 @@ public class Transaction {
s_logger.warn("Unexpected exception: ", e);
return null;
}
}
protected void attach(TransactionAttachment value) {
_stack.push(new StackElement(ATTACHMENT, value));
}
protected TransactionAttachment detach(String name) {
Iterator<StackElement> it = _stack.descendingIterator();
while (it.hasNext()) {
StackElement element = it.next();
if (element.type == ATTACHMENT) {
TransactionAttachment att = (TransactionAttachment)element.ref;
if (name.equals(att.getName())) {
it.remove();
return att;
}
}
}
assert false : "Are you sure you attached this: " + name;
return null;
}
public static void attachToTxn(TransactionAttachment value) {
Transaction txn = tls.get();
assert txn != null && txn.peekInStack(CURRENT_TXN) != null: "Come on....how can we attach something to the transaction if you haven't started it?";
txn.attach(value);
}
public static TransactionAttachment detachFromTxn(String name) {
Transaction txn = tls.get();
assert txn != null : "No Transaction in TLS";
return txn.detach(name);
}
protected static boolean checkAnnotation(int stack, Transaction txn) {
@ -484,7 +517,6 @@ public class Transaction {
closeConnection();
_stack.clear();
if (_lockMaster != null) {
_lockMaster.clear();
}
@ -608,45 +640,55 @@ public class Transaction {
it.remove();
if (item.type == type && (ref == null || item.ref == ref)) {
break;
}
if (item.type == CURRENT_TXN) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : ""));
try {
if (item.type == type && (ref == null || item.ref == ref)) {
break;
}
} else if (item.type == CREATE_CONN) {
closeConnection();
} else if (item.type == START_TXN) {
if (item.ref == null) {
rollback = true;
} else {
if (item.type == CURRENT_TXN) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : ""));
}
} else if (item.type == CREATE_CONN) {
closeConnection();
} else if (item.type == START_TXN) {
if (item.ref == null) {
rollback = true;
} else {
try {
_conn.rollback((Savepoint)ref);
rollback = false;
} catch (final SQLException e) {
s_logger.warn("Unable to rollback Txn.", e);
}
}
} else if (item.type == STATEMENT) {
try {
_conn.rollback((Savepoint)ref);
rollback = false;
} catch (final SQLException e) {
s_logger.warn("Unable to rollback Txn.", e);
}
}
} else if (item.type == STATEMENT) {
try {
if (s_stmtLogger.isTraceEnabled()) {
s_stmtLogger.trace("Closing: " + ref.toString());
}
Statement stmt = (Statement)ref;
try {
ResultSet rs = stmt.getResultSet();
if (rs != null) {
rs.close();
if (s_stmtLogger.isTraceEnabled()) {
s_stmtLogger.trace("Closing: " + ref.toString());
}
Statement stmt = (Statement)ref;
try {
ResultSet rs = stmt.getResultSet();
if (rs != null) {
rs.close();
}
} catch(SQLException e) {
s_stmtLogger.trace("Unable to close resultset");
}
} catch(SQLException e) {
s_stmtLogger.trace("Unable to close resultset");
}
stmt.close();
} catch (final SQLException e) {
s_stmtLogger.trace("Unable to close statement: " + item.toString());
stmt.close();
} catch (final SQLException e) {
s_stmtLogger.trace("Unable to close statement: " + item.toString());
}
} else if (item.type == ATTACHMENT) {
TransactionAttachment att = (TransactionAttachment)item.ref;
if (s_logger.isTraceEnabled()) {
s_logger.trace("Cleaning up " + att.getName());
}
att.cleanup();
}
} catch(Exception e) {
s_logger.error("Unable to clean up " + item, e);
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.utils.db;
/**
* TransactionAttachment are objects added to Transaction such that when
* the in memory transaction is closed, they are automatically closed.
* This is useful when the code needs to push something into TLS for a
* session but needs it to be cleanup when the session is done.
*
*/
public interface TransactionAttachment {
/**
* @return a unique name to be inserted.
*/
String getName();
/**
* cleanup() if it wasn't cleaned up before.
*/
void cleanup();
}