propagated db connection savings from master

This commit is contained in:
Alex Huang 2011-07-18 14:22:31 -07:00
parent e52a97b969
commit 392c03ce6f
5 changed files with 34 additions and 98 deletions

View File

@ -289,7 +289,7 @@ public abstract class AgentAttache {
}
}
_agentMgr.notifyAnswersToMonitors(_id, seq, answers);
_agentMgr.notifyAnswersToMonitors(_id, seq, answers);
} finally {
// we should always trigger next command execution, even in failure cases - otherwise in exception case all the remaining will be stuck in the sync queue forever
@ -339,7 +339,7 @@ public abstract class AgentAttache {
if (listener != null) {
registerListener(seq, listener);
} else if (s_logger.isDebugEnabled()) {
s_logger.debug(log(seq, "Routing: "));
s_logger.debug(log(seq, "Routed from " + req.getManagementServerId()));
}
synchronized(this) {

View File

@ -68,6 +68,7 @@ import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.Inject;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.ConnectionConcierge;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.events.SubscriptionMgr;
@ -102,7 +103,7 @@ public class ClusterManagerImpl implements ClusterManager {
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
private final List<ClusterManagerMessage> _notificationMsgs = new ArrayList<ClusterManagerMessage>();
private Connection _heartbeatConnection = null;
private ConnectionConcierge _heartbeatConnection = null;
private final ExecutorService _executor;
@ -647,23 +648,20 @@ public class ClusterManagerImpl implements ClusterManager {
}
private Connection getHeartbeatConnection() throws SQLException {
if(_heartbeatConnection != null) {
return _heartbeatConnection;
if(_heartbeatConnection == null) {
Connection conn = Transaction.getStandaloneConnectionWithException();
_heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false, false);
}
_heartbeatConnection = Transaction.getStandaloneConnectionWithException();
return _heartbeatConnection;
return _heartbeatConnection.conn();
}
private void invalidHeartbeatConnection() {
if(_heartbeatConnection != null) {
try {
_heartbeatConnection.close();
} catch (SQLException e) {
s_logger.warn("Unable to close hearbeat DB connection. ", e);
Connection conn = Transaction.getStandaloneConnection();
if (conn != null) {
_heartbeatConnection.reset(Transaction.getStandaloneConnection());
}
_heartbeatConnection = null;
}
}

View File

@ -118,7 +118,6 @@ import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.hypervisor.HypervisorGuruManager;
import com.cloud.network.NetworkManager;
import com.cloud.network.router.VirtualNetworkApplianceManager;
import com.cloud.offering.DiskOffering;
import com.cloud.org.Grouping;
import com.cloud.server.ManagementServer;
import com.cloud.service.ServiceOfferingVO;
@ -1663,7 +1662,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
if (diskOffering.getDiskSize() > 0) {
size = diskOffering.getDiskSize();
}
}
if (!validateVolumeSizeRange(size)) {// convert size from mb to gb for validation
throw new InvalidParameterValueException("Invalid size for custom volume creation: " + size + " ,max volume size is:" + _maxVolumeSizeInGb);
@ -1677,7 +1676,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
if (snapshotCheck.getStatus() != Snapshot.Status.BackedUp) {
throw new InvalidParameterValueException("Snapshot id=" + snapshotId + " is not in " + Snapshot.Status.BackedUp + " state yet and can't be used for volume creation");
}
}
diskOfferingId = (cmd.getDiskOfferingId() != null) ? cmd.getDiskOfferingId() : snapshotCheck.getDiskOfferingId();
zoneId = snapshotCheck.getDataCenterId();
@ -2602,7 +2601,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
}
if (assignedPool != null) {
Volume.State state = vol.getState();
if (state == Volume.State.Allocated) {
if (state == Volume.State.Allocated || state == Volume.State.Creating) {
recreateVols.add(vol);
} else {
if (vol.isRecreatable()) {
@ -2624,7 +2623,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("No need to recreate the volume: "+vol+ ", since it already has a pool assigned: "+vol.getPoolId()+", adding disk to VM");
}
}
StoragePoolVO pool = _storagePoolDao.findById(vol.getPoolId());
vm.addDisk(new VolumeTO(vol, pool));
}
@ -2689,7 +2688,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
txn.start();
_volsDao.update(existingVolume, Volume.Event.Destroy);
Long templateIdToUse = null;
Long templateIdToUse = null;
Long volTemplateId = existingVolume.getTemplateId();
long vmTemplateId = vm.getTemplateId();
if (volTemplateId != null && volTemplateId.longValue() != vmTemplateId) {

View File

@ -27,16 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.management.StandardMBean;
import org.apache.log4j.Logger;
import com.cloud.utils.DateUtil;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.time.InaccurateClock;
@ -62,15 +58,18 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
private long _msId;
private static Merovingian2 s_instance = null;
ScheduledExecutorService _executor = null;
Connection _conn = null;
ConnectionConcierge _concierge = null;
private Merovingian2(long msId) {
super(MerovingianMBean.class, false);
_msId = msId;
String result = resetDbConnection();
if (!result.equalsIgnoreCase("Success")) {
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes due to " + result);
Connection conn = null;
try {
conn = Transaction.getStandaloneConnectionWithException();
_concierge = new ConnectionConcierge("LockMaster", conn, true, true);
} catch (SQLException e) {
s_logger.error("Unable to get a new db connection", e);
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes: ", e);
}
}
@ -90,43 +89,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
return s_instance;
}
@Override
public String resetDbConnection() {
s_logger.info("Resetting the database connection for locks");
if (_conn != null) {
try {
_conn.close();
} catch (Throwable th) {
s_logger.error("Unable to close connection", th);
}
}
try {
_conn = Transaction.getStandaloneConnectionWithException();
_conn.setAutoCommit(true);
} catch (SQLException e) {
s_logger.error("Unable to get a new db connection", e);
return "Unable to initialize a connection to the database for locking purposes: " + e;
}
if (_conn == null) {
return "Unable to initialize a connection to the database for locking purposes, shutdown this server!";
}
if (_executor != null) {
try {
_executor.shutdown();
} catch (Throwable th) {
s_logger.error("Unable to shutdown the executor", th);
}
}
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LockMasterConnectionKeepAlive"));
_executor.schedule(new KeepAliveTask(), 10, TimeUnit.SECONDS);
return "Success";
}
public boolean acquire(String key, int timeInSeconds) {
Thread th = Thread.currentThread();
String threadName = th.getName();
@ -161,7 +123,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
protected boolean increment(String key, String threadName, int threadId) {
PreparedStatement pstmt = null;
try {
pstmt = _conn.prepareStatement(INCREMENT_SQL);
pstmt = _concierge.conn().prepareStatement(INCREMENT_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
@ -189,7 +151,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
long startTime = InaccurateClock.getTime();
try {
pstmt = _conn.prepareStatement(ACQUIRE_SQL);
pstmt = _concierge.conn().prepareStatement(ACQUIRE_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
@ -227,7 +189,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
pstmt = _conn.prepareStatement(INQUIRE_SQL);
pstmt = _concierge.conn().prepareStatement(INQUIRE_SQL);
pstmt.setString(1, key);
rs = pstmt.executeQuery();
if (!rs.next()) {
@ -260,7 +222,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
s_logger.info("Cleaning up locks for " + msId);
PreparedStatement pstmt = null;
try {
pstmt = _conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
pstmt = _concierge.conn().prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
pstmt.setLong(1, msId);
int rows = pstmt.executeUpdate();
s_logger.info("Released " + rows + " locks for " + msId);
@ -282,7 +244,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
String threadName = th.getName();
int threadId = System.identityHashCode(th);
try {
pstmt = _conn.prepareStatement(DECREMENT_SQL);
pstmt = _concierge.conn().prepareStatement(DECREMENT_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
@ -294,7 +256,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
}
if (rows == 1) {
pstmt.close();
pstmt = _conn.prepareStatement(RELEASE_SQL);
pstmt = _concierge.conn().prepareStatement(RELEASE_SQL);
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
int result = pstmt.executeUpdate();
@ -339,7 +301,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
pstmt = _conn.prepareStatement(sql);
pstmt = _concierge.conn().prepareStatement(sql);
if (msId != null) {
pstmt.setLong(1, msId);
}
@ -387,7 +349,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
pstmt = _conn.prepareStatement(SELECT_THREAD_LOCKS_SQL);
pstmt = _concierge.conn().prepareStatement(SELECT_THREAD_LOCKS_SQL);
pstmt.setLong(1, msId);
pstmt.setString(2, threadName);
rs = pstmt.executeQuery();
@ -414,7 +376,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
PreparedStatement pstmt = null;
try {
pstmt = _conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
pstmt = _concierge.conn().prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
pstmt.setLong(1, _msId);
pstmt.setString(2, threadName);
pstmt.setInt(3, threadId);
@ -437,7 +399,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
s_logger.info("Releasing a lock from jMX lck-" + key);
PreparedStatement pstmt = null;
try {
pstmt = _conn.prepareStatement(RELEASE_LOCK_SQL);
pstmt = _concierge.conn().prepareStatement(RELEASE_LOCK_SQL);
pstmt.setString(1, key);
int rows = pstmt.executeUpdate();
return rows > 0;
@ -446,25 +408,4 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
return false;
}
}
protected class KeepAliveTask implements Runnable {
@Override
public void run() {
PreparedStatement pstmt = null; // Should this even be prepared everytime?
try {
pstmt = _conn.prepareStatement("SELECT 1");
pstmt.executeQuery();
} catch (Throwable th) {
s_logger.error("Unable to keep the db connection alive for locking purposes!", th);
} finally {
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
}
}
}
}
}
}

View File

@ -29,7 +29,5 @@ public interface MerovingianMBean {
boolean releaseLockAsLastResortAndIReallyKnowWhatIAmDoing(String key);
String resetDbConnection();
void cleanupForServer(long msId);
}