More Managed Context changes

This commit is contained in:
Darren Shepherd 2013-09-30 15:13:55 -07:00
parent b8a467ef71
commit 7cac1bd67e
24 changed files with 111 additions and 66 deletions

View File

@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@ -36,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
import com.cloud.agent.api.AgentControlAnswer;
@ -731,7 +731,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
public class WatchTask extends TimerTask {
public class WatchTask extends ManagedContextTimerTask {
protected Request _request;
protected Agent _agent;
protected Link _link;
@ -744,7 +744,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
public void run() {
protected void runInContext() {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Scheduling " + (_request instanceof Response ? "Ping" : "Watch Task"));
}
@ -760,7 +760,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
public class StartupTask extends TimerTask {
public class StartupTask extends ManagedContextTimerTask {
protected Link _link;
protected volatile boolean cancelled = false;
@ -782,7 +782,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
public synchronized void run() {
protected synchronized void runInContext() {
if (!cancelled) {
if (s_logger.isInfoEnabled()) {
s_logger.info("The startup command is now cancelled");

View File

@ -32,6 +32,7 @@ import java.util.Properties;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.agent.Agent.ExitStatus;
@ -357,8 +358,9 @@ public class ConsoleProxyResource extends ServerResourceBase implements
private void launchConsoleProxy(final byte[] ksBits, final String ksPassword, final String encryptorPassword) {
final Object resource = this;
if (_consoleProxyMain == null) {
_consoleProxyMain = new Thread(new Runnable() {
public void run() {
_consoleProxyMain = new Thread(new ManagedContextRunnable() {
@Override
protected void runInContext() {
try {
Class<?> consoleProxyClazz = Class.forName("com.cloud.consoleproxy.ConsoleProxy");
try {

View File

@ -200,14 +200,14 @@ public class CallContext {
s_logger.trace("Popping from NDC: " + contextId);
}
}
Stack<CallContext> stack = s_currentContextStack.get();
stack.pop();
if ( ! stack.isEmpty() ) {
s_currentContext.set(stack.peek());
}
return context;
}

View File

@ -35,6 +35,7 @@ import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.axis2.AxisFault;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.springframework.stereotype.Component;
@ -280,10 +281,9 @@ public class ServiceProvider extends ManagerBase {
}
private TimerTask getHeartbeatTask() {
return new TimerTask() {
return new ManagedContextTimerTask() {
@Override
public void run() {
protected void runInContext() {
try {
mhost.setLastHeartbeatTime(DateHelper.currentGMTTime());
mhostDao.updateHeartBeat(mhost);

View File

@ -52,6 +52,22 @@
</property>
</bean>
<!--
Managed Context
-->
<bean id="ManagedContext" class="org.apache.cloudstack.managed.context.impl.DefaultManagedContext" >
<property name="listeners">
<list>
<bean class="org.apache.cloudstack.context.CallContextListener" />
</list>
</property>
</bean>
<bean class="org.apache.cloudstack.managed.context.ManagedContextRunnable" factory-method="initializeGlobalContext"
autowire-candidate="false" >
<constructor-arg><ref bean="ManagedContext"/></constructor-arg>
</bean>
<!--
RPC/Async/EventBus
-->

View File

@ -40,7 +40,7 @@ import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.log4j.Logger;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType;
import com.cloud.agent.api.storage.Proxy;
@ -52,7 +52,7 @@ import com.cloud.utils.UriUtils;
* Download a template file using HTTP
*
*/
public class HttpTemplateDownloader implements TemplateDownloader {
public class HttpTemplateDownloader extends ManagedContextRunnable implements TemplateDownloader {
public static final Logger s_logger = Logger.getLogger(HttpTemplateDownloader.class.getName());
private static final MultiThreadedHttpConnectionManager s_httpClientManager = new MultiThreadedHttpConnectionManager();
@ -350,7 +350,7 @@ public class HttpTemplateDownloader implements TemplateDownloader {
}
@Override
public void run() {
protected void runInContext() {
try {
download(resume, completionCallback);
} catch (Throwable t) {

View File

@ -50,6 +50,7 @@ import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType;
import com.cloud.agent.api.storage.Proxy;
@ -62,7 +63,7 @@ import com.cloud.utils.UriUtils;
* Download a template file using HTTP
*
*/
public class S3TemplateDownloader implements TemplateDownloader {
public class S3TemplateDownloader extends ManagedContextRunnable implements TemplateDownloader {
public static final Logger s_logger = Logger.getLogger(S3TemplateDownloader.class.getName());
private static final MultiThreadedHttpConnectionManager s_httpClientManager = new MultiThreadedHttpConnectionManager();
@ -361,7 +362,7 @@ public class S3TemplateDownloader implements TemplateDownloader {
}
@Override
public void run() {
protected void runInContext() {
try {
download(resume, completionCallback);
} catch (Throwable t) {

View File

@ -18,11 +18,12 @@ package com.cloud.storage.template;
import java.io.File;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.storage.StorageLayer;
public abstract class TemplateDownloaderBase implements TemplateDownloader {
public abstract class TemplateDownloaderBase extends ManagedContextRunnable implements TemplateDownloader {
private static final Logger s_logger = Logger.getLogger(TemplateDownloaderBase.class);
protected String _downloadUrl;
@ -123,7 +124,7 @@ public abstract class TemplateDownloaderBase implements TemplateDownloader {
}
@Override
public void run() {
protected void runInContext() {
try {
download(_resume, _callback);
} catch (Exception e) {

View File

@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.agent.Listener;
@ -497,14 +498,14 @@ public abstract class AgentAttache {
*/
protected abstract boolean isClosed();
protected class Alarm implements Runnable {
protected class Alarm extends ManagedContextRunnable {
long _seq;
public Alarm(long seq) {
_seq = seq;
}
@Override
public void run() {
protected void runInContext() {
try {
Listener listener = unregisterListener(_seq);
if (listener != null) {

View File

@ -1016,7 +1016,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
return attache;
}
protected class SimulateStartTask extends ManagedContextRunnable implements Runnable {
protected class SimulateStartTask extends ManagedContextRunnable {
ServerResource resource;
Map<String, String> details;
long id;
@ -1056,7 +1056,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
}
}
protected class HandleAgentConnectTask extends ManagedContextRunnable implements Runnable {
protected class HandleAgentConnectTask extends ManagedContextRunnable {
Link _link;
Command[] _cmds;
Request _request;

View File

@ -218,18 +218,18 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
private Runnable getClusterPduSendingTask() {
return new Runnable() {
return new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
onSendingClusterPdu();
}
};
}
private Runnable getClusterPduNotificationTask() {
return new Runnable() {
return new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
onNotifyingClusterPdu();
}
};
@ -290,9 +290,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if(pdu == null)
continue;
_executor.execute(new Runnable() {
_executor.execute(new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) {
ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if(requestPdu != null) {
@ -529,9 +529,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
private Runnable getHeartbeatTask() {
return new Runnable() {
return new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
Transaction txn = Transaction.open("ClusterHeartbeat");
try {
Profiler profiler = new Profiler();

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.StandardMBean;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.utils.concurrency.NamedThreadFactory;
@ -198,10 +199,9 @@ public class ConnectionConcierge {
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionConcierge"));
_executor.scheduleAtFixedRate(new Runnable() {
_executor.scheduleAtFixedRate(new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
s_logger.trace("connection concierge keep alive task");
for (Map.Entry<String, ConnectionConcierge> entry : _conns.entrySet()) {
ConnectionConcierge concierge = entry.getValue();

View File

@ -27,6 +27,7 @@ import org.apache.cloudstack.framework.serializer.MessageSerializer;
import org.apache.cloudstack.framework.transport.TransportEndpoint;
import org.apache.cloudstack.framework.transport.TransportEndpointSite;
import org.apache.cloudstack.framework.transport.TransportProvider;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import com.cloud.utils.concurrency.NamedThreadFactory;
@ -62,10 +63,9 @@ public class ClientTransportProvider implements TransportProvider {
_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
_connection = new ClientTransportConnection(this);
_executor.execute(new Runnable() {
@Override
public void run() {
_executor.execute(new ManagedContextRunnable() {
@Override
protected void runInContext() {
try {
_connection.connect(_serverAddress, _serverPort);
} catch(Throwable e) {

View File

@ -31,6 +31,7 @@ import org.apache.cloudstack.framework.transport.TransportEndpoint;
import org.apache.cloudstack.framework.transport.TransportEndpointSite;
import org.apache.cloudstack.framework.transport.TransportPdu;
import org.apache.cloudstack.framework.transport.TransportProvider;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.utils.concurrency.NamedThreadFactory;
@ -132,11 +133,10 @@ public class ServerTransportProvider implements TransportProvider {
@Override
public void requestSiteOutput(final TransportEndpointSite site) {
_executor.execute(new Runnable() {
@Override
public void run() {
try {
_executor.execute(new ManagedContextRunnable() {
@Override
protected void runInContext() {
try {
site.processOutput();
site.ackOutputProcessSignal();
} catch(Throwable e) {

View File

@ -490,9 +490,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private Runnable getExecutorRunnable(final AsyncJob job) {
return new Runnable() {
return new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
Transaction txn = null;
long runNumber = getJobRunNumber();
@ -687,9 +687,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private Runnable getHeartbeatTask() {
return new Runnable() {
return new ManagedContextRunnable() {
@Override
public void run() {
protected void runInContext() {
Transaction txn = Transaction.open("AsyncJobManagerImpl.getHeartbeatTask");
try {
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);

View File

@ -18,6 +18,7 @@
*/
package org.apache.cloudstack.managed.context;
import org.apache.cloudstack.managed.context.impl.DefaultManagedContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,13 +27,17 @@ public abstract class ManagedContextRunnable implements Runnable {
private static final int SLEEP_COUNT = 120;
private static final Logger log = LoggerFactory.getLogger(ManagedContextRunnable.class);
private static final ManagedContext DEFAULT_MANAGED_CONTEXT = new DefaultManagedContext();
private static ManagedContext context;
private static boolean managedContext = false;
/* This is slightly dirty, but the idea is that we only save the ManagedContext
* in a static global. Any ManagedContextListener can be a fully managed object
* and not have to rely on global statics
*/
public static ManagedContext initializeGlobalContext(ManagedContext context) {
setManagedContext(true);
return ManagedContextRunnable.context = context;
}
@ -43,13 +48,15 @@ public abstract class ManagedContextRunnable implements Runnable {
public void run() {
runInContext();
}
});
}
protected abstract void runInContext();
protected ManagedContext getContext() {
if ( ! managedContext )
return DEFAULT_MANAGED_CONTEXT;
for ( int i = 0 ; i < SLEEP_COUNT ; i++ ) {
if ( context == null ) {
try {
@ -67,4 +74,13 @@ public abstract class ManagedContextRunnable implements Runnable {
throw new RuntimeException("Failed to obtain ManagedContext");
}
}
public static boolean isManagedContext() {
return managedContext;
}
public static void setManagedContext(boolean managedContext) {
ManagedContextRunnable.managedContext = managedContext;
}
}

View File

@ -20,7 +20,9 @@
package org.apache.cloudstack.mom.rabbitmq;
import com.rabbitmq.client.*;
import org.apache.cloudstack.framework.events.*;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.utils.Ternary;
@ -28,6 +30,7 @@ import com.cloud.utils.component.ManagerBase;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import java.io.IOException;
import java.net.ConnectException;
import java.util.Map;
@ -493,12 +496,13 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
}
// retry logic to connect back to AMQP server after loss of connection
private class ReconnectionTask implements Runnable {
private class ReconnectionTask extends ManagedContextRunnable {
boolean connected = false;
Connection connection = null;
public void run() {
@Override
protected void runInContext() {
while (!connected) {
try {

View File

@ -20,7 +20,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.log4j.Logger;
import com.cloud.utils.script.Script;
import org.libvirt.Connect;
@ -68,10 +71,10 @@ public class KVMHAMonitor extends KVMHABase implements Runnable {
}
}
private class Monitor implements Runnable {
private class Monitor extends ManagedContextRunnable {
@Override
public void run() {
protected void runInContext() {
synchronized (_storagePool) {
for (String uuid : _storagePool.keySet()) {
NfsStoragePool primaryStoragePool = _storagePool.get(uuid);

View File

@ -31,7 +31,6 @@ import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.api.AddUcsManagerCmd;
import org.apache.cloudstack.api.AssociateUcsProfileToBladeCmd;
import org.apache.cloudstack.api.DeleteUcsManagerCmd;
@ -43,6 +42,7 @@ import org.apache.cloudstack.api.response.UcsBladeResponse;
import org.apache.cloudstack.api.response.UcsManagerResponse;
import org.apache.cloudstack.api.response.UcsProfileResponse;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import com.cloud.configuration.Config;
import com.cloud.dc.ClusterDetailsDao;

View File

@ -33,12 +33,10 @@ import org.springframework.stereotype.Component;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
import org.apache.cloudstack.context.ServerContexts;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.managed.context.ManagedContext;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import com.cloud.api.ApiDispatcher;

View File

@ -50,7 +50,6 @@ import org.apache.cloudstack.api.command.admin.user.DeleteUserCmd;
import org.apache.cloudstack.api.command.admin.user.RegisterCmd;
import org.apache.cloudstack.api.command.admin.user.UpdateUserCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.context.ServerContexts;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;

View File

@ -61,7 +61,6 @@ import org.apache.cloudstack.api.command.user.vm.UpgradeVMCmd;
import org.apache.cloudstack.api.command.user.vmgroup.CreateVMGroupCmd;
import org.apache.cloudstack.api.command.user.vmgroup.DeleteVMGroupCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.context.ServerContexts;
import org.apache.cloudstack.engine.cloud.entity.api.VirtualMachineEntity;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;

View File

@ -37,6 +37,7 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.managed.context.ManagedContext;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.usage.UsageTypes;
import org.springframework.stereotype.Component;
@ -1661,8 +1662,9 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
m_usageVMSnapshotDao.persist(vsVO);
}
private class Heartbeat implements Runnable {
public void run() {
private class Heartbeat extends ManagedContextRunnable {
@Override
protected void runInContext() {
Transaction usageTxn = Transaction.open(Transaction.USAGE_DB);
try {
if(!m_heartbeatLock.lock(3)) { // 3 second timeout
@ -1767,8 +1769,9 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
}
}
private class SanityCheck implements Runnable {
public void run() {
private class SanityCheck extends ManagedContextRunnable {
@Override
protected void runInContext() {
UsageSanityChecker usc = new UsageSanityChecker();
try {
String errors = usc.runSanityCheck();

View File

@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
public class VmwareContextPool {
@ -117,9 +119,9 @@ public class VmwareContextPool {
}
private TimerTask getTimerTask() {
return new TimerTask() {
@Override
public void run() {
return new ManagedContextTimerTask() {
@Override
protected void runInContext() {
try {
// doIdleCheck();