diff --git a/server/src/com/cloud/configuration/DefaultComponentLibrary.java b/server/src/com/cloud/configuration/DefaultComponentLibrary.java index d743cd53b69..b56e554894a 100755 --- a/server/src/com/cloud/configuration/DefaultComponentLibrary.java +++ b/server/src/com/cloud/configuration/DefaultComponentLibrary.java @@ -98,6 +98,7 @@ import com.cloud.network.router.VirtualNetworkApplianceManagerImpl; import com.cloud.network.rules.RulesManagerImpl; import com.cloud.network.rules.dao.PortForwardingRulesDaoImpl; import com.cloud.network.security.SecurityGroupManagerImpl; +import com.cloud.network.security.SecurityGroupManagerImpl2; import com.cloud.network.security.dao.IngressRuleDaoImpl; import com.cloud.network.security.dao.SecurityGroupDaoImpl; import com.cloud.network.security.dao.SecurityGroupRulesDaoImpl; @@ -301,7 +302,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com addManager("Template Manager", TemplateManagerImpl.class); addManager("Snapshot Manager", SnapshotManagerImpl.class); addManager("SnapshotScheduler", SnapshotSchedulerImpl.class); - addManager("SecurityGroupManager", SecurityGroupManagerImpl.class); + addManager("SecurityGroupManager", SecurityGroupManagerImpl2.class); addManager("DomainRouterManager", VirtualNetworkApplianceManagerImpl.class); addManager("EntityManager", EntityManagerImpl.class); addManager("LoadBalancingRulesManager", LoadBalancingRulesManagerImpl.class); diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java index 4e8aadf3d9d..d8826fac47c 100755 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java @@ -151,7 +151,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG private long _serverId; private int _timeBetweenCleanups = TIME_BETWEEN_CLEANUPS; // seconds - private int _numWorkerThreads = WORKER_THREAD_COUNT; + protected int _numWorkerThreads = WORKER_THREAD_COUNT; private int _globalWorkLockTimeout = 300; // 5 minutes private final GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork"); @@ -784,11 +784,17 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG _serverId = ((ManagementServer) ComponentLocator.getComponent(ManagementServer.Name)).getId(); + + s_logger.info("SecurityGroupManager: num worker threads=" + _numWorkerThreads + + ", time between cleanups=" + _timeBetweenCleanups + " global lock timeout=" + _globalWorkLockTimeout); + createThreadPools(); + + return true; + } + + protected void createThreadPools() { _executorPool = Executors.newScheduledThreadPool(_numWorkerThreads, new NamedThreadFactory("NWGRP")); _cleanupExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NWGRP-Cleanup")); - - s_logger.info("SecurityGroupManager: num worker threads=" + _numWorkerThreads + ", time between cleanups=" + _timeBetweenCleanups); - return true; } @Override diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java index 00f05fb4e34..5e56147c3f0 100644 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java @@ -23,13 +23,13 @@ import java.util.Set; import java.util.TreeSet; import javax.ejb.Local; -import javax.naming.ConfigurationException; import com.cloud.agent.api.SecurityIngressRulesCmd; import com.cloud.agent.manager.Commands; import com.cloud.exception.AgentUnavailableException; import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.uservm.UserVm; +import com.cloud.utils.db.DB; import com.cloud.utils.db.Transaction; import com.cloud.vm.VirtualMachine.State; @@ -44,7 +44,6 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl { WorkerThread[] _workers; - long _timeToSleep = 10000; protected class WorkerThread extends Thread { @@ -54,12 +53,27 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl { @Override public void run() { - work(); + while (true) { + try{ + work(); + } catch (final Throwable th) { + s_logger.error("SG Work: Caught this throwable, ", th); + } + } } } + + @Override + protected void createThreadPools() { + _workers = new WorkerThread[_numWorkerThreads]; + for (int i = 0; i < _workers.length; i++) { + _workers[i] = new WorkerThread("SecGrp-Worker-" + i); + } + } @Override + @DB public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) { if (affectedVms.size() == 0) { return; @@ -77,47 +91,44 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl { _workQueue.submitWorkForVms(workItems); } - @Override - public boolean configure(String name, Map params) throws ConfigurationException { - // TODO Auto-generated method stub - return super.configure(name, params); - } @Override public boolean start() { - // TODO Auto-generated method stub - return super.start(); + for (final WorkerThread thread : _workers) { + thread.start(); + } + return true; } @Override public void work() { - while (true) { - try { - s_logger.trace("Checking the work queue"); - List workItems = _workQueue.getWork(1); - - for (SecurityGroupWork work: workItems) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Processing " + work.getInstanceId()); - } - - try { - VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId()); - if (rulesetLog == null) { - s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId()); - continue; - } - work.setLogsequenceNumber(rulesetLog.getLogsequence()); - sendRulesetUpdates(work); - }catch (Exception e) { - s_logger.error("Problem during SG work " + work, e); - work.setStep(Step.Error); - } + s_logger.trace("Checking the work queue"); + List workItems; + try { + workItems = _workQueue.getWork(1); + for (SecurityGroupWork work: workItems) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Processing " + work.getInstanceId()); } - } catch (final Throwable th) { - s_logger.error("Caught this throwable, ", th); - } + + try { + VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId()); + if (rulesetLog == null) { + s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId()); + continue; + } + work.setLogsequenceNumber(rulesetLog.getLogsequence()); + sendRulesetUpdates(work); + }catch (Exception e) { + s_logger.error("Problem during SG work " + work, e); + work.setStep(Step.Error); + } + } + } catch (InterruptedException e1) { + s_logger.warn("SG work: caught InterruptException", e1); } + + } protected void sendRulesetUpdates(SecurityGroupWork work){ diff --git a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java index 1394646130a..bd2c76f5a6c 100644 --- a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java +++ b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java @@ -63,6 +63,7 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase im boolean success = true; try { stmtInsert = txn.prepareAutoCloseStatement(INSERT_OR_UPDATE); + txn.start(); for (Long vmId: workItems) { stmtInsert.setLong(1, vmId); stmtInsert.addBatch();