cloudstack/server/src/com/cloud/cluster/CheckPointManagerImpl.java

247 lines
8.5 KiB
Java

/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.cluster;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.cluster.dao.StackMaidDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.serializer.SerializerHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.Inject;
import com.cloud.utils.component.Manager;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
@Local(value=CheckPointManager.class)
public class CheckPointManagerImpl implements CheckPointManager, Manager, ClusterManagerListener {
private static final Logger s_logger = Logger.getLogger(CheckPointManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
private int _cleanupRetryInterval;
private String _name;
@Inject
private StackMaidDao _maidDao;
@Inject
private ClusterManager _clusterMgr;
long _msId;
private final ScheduledExecutorService _cleanupScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Task-Cleanup"));
protected CheckPointManagerImpl() {
}
@Override
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
_name = name;
if (s_logger.isInfoEnabled()) {
s_logger.info("Start configuring StackMaidManager : " + name);
}
StackMaid.init(ManagementServerNode.getManagementServerId());
_msId = ManagementServerNode.getManagementServerId();
_clusterMgr.registerListener(this);
ComponentLocator locator = ComponentLocator.getCurrentLocator();
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
Map<String, String> params = configDao.getConfiguration(xmlParams);
_cleanupRetryInterval = NumbersUtil.parseInt(params.get(Config.TaskCleanupRetryInterval.key()), 600);
_maidDao.takeover(_msId, _msId);
return true;
}
private void cleanupLeftovers(List<CheckPointVO> l) {
for (CheckPointVO maid : l) {
if (StackMaid.doCleanup(maid)) {
_maidDao.expunge(maid.getId());
}
}
}
@Override
public void onManagementNodeIsolated() {
}
@DB
private Runnable getGCTask() {
return new Runnable() {
@Override
public void run() {
GlobalLock scanLock = GlobalLock.getInternLock("StackMaidManagerGC");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
public void reallyRun() {
try {
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - 7200000);
List<CheckPointVO> l = _maidDao.listLeftoversByCutTime(cutTime);
cleanupLeftovers(l);
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
}
}
};
}
@Override
public boolean start() {
_cleanupScheduler.schedule(new CleanupTask(), _cleanupRetryInterval > 0 ? _cleanupRetryInterval : 600, TimeUnit.SECONDS);
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public String getName() {
return _name;
}
@Override
public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
// Nothing to do
}
@Override
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
for (ManagementServerHostVO node : nodeList) {
if (_maidDao.takeover(node.getMsid(), selfNodeId)) {
s_logger.info("Taking over from " + node.getMsid());
_cleanupScheduler.execute(new CleanupTask());
}
}
}
@Override
@DB
public long pushCheckPoint(CleanupMaid context) {
long seq = _maidDao.pushCleanupDelegate(_msId, 0, context.getClass().getName(), context);
return seq;
}
@Override
@DB
public void updateCheckPointState(long taskId, CleanupMaid updatedContext) {
CheckPointVO task = _maidDao.createForUpdate();
task.setDelegate(updatedContext.getClass().getName());
task.setContext(SerializerHelper.toSerializedStringOld(updatedContext));
_maidDao.update(taskId, task);
}
@Override
@DB
public void popCheckPoint(long taskId) {
_maidDao.remove(taskId);
}
protected boolean cleanup(CheckPointVO task) {
s_logger.info("Cleaning up " + task);
CleanupMaid delegate = (CleanupMaid)SerializerHelper.fromSerializedString(task.getContext());
assert delegate.getClass().getName().equals(task.getDelegate()) : "Deserializer says " + delegate.getClass().getName() + " but it's suppose to be " + task.getDelegate();
int result = delegate.cleanup(this);
if (result <= 0) {
if (result == 0) {
s_logger.info("Successfully cleaned up " + task.getId());
} else {
s_logger.warn("Unsuccessful in cleaning up " + task + ". Procedure to cleanup manaully: " + delegate.getCleanupProcedure());
}
popCheckPoint(task.getId());
return true;
} else {
s_logger.error("Unable to cleanup " + task.getId());
return false;
}
}
class CleanupTask implements Runnable {
public CleanupTask() {
}
@Override
public void run() {
try {
List<CheckPointVO> tasks = _maidDao.listCleanupTasks(_msId);
List<CheckPointVO> retries = new ArrayList<CheckPointVO>();
for (CheckPointVO task : tasks) {
try {
if (!cleanup(task)) {
retries.add(task);
}
} catch (Exception e) {
s_logger.warn("Unable to clean up " + task, e);
}
}
if (retries.size() > 0) {
if (_cleanupRetryInterval > 0) {
_cleanupScheduler.schedule(this, _cleanupRetryInterval, TimeUnit.SECONDS);
} else {
for (CheckPointVO task : retries) {
s_logger.warn("Cleanup procedure for " + task + ": " + ((CleanupMaid)SerializerHelper.fromSerializedString(task.getContext())).getCleanupProcedure());
}
}
}
} catch (Exception e) {
s_logger.error("Unable to cleanup all of the tasks for " + _msId, e);
}
}
}
}