mirror of https://github.com/apache/cloudstack.git
use the new implementation of SG manager
This commit is contained in:
parent
8c6fe3a9af
commit
c5fea7a03c
|
|
@ -98,6 +98,7 @@ import com.cloud.network.router.VirtualNetworkApplianceManagerImpl;
|
|||
import com.cloud.network.rules.RulesManagerImpl;
|
||||
import com.cloud.network.rules.dao.PortForwardingRulesDaoImpl;
|
||||
import com.cloud.network.security.SecurityGroupManagerImpl;
|
||||
import com.cloud.network.security.SecurityGroupManagerImpl2;
|
||||
import com.cloud.network.security.dao.IngressRuleDaoImpl;
|
||||
import com.cloud.network.security.dao.SecurityGroupDaoImpl;
|
||||
import com.cloud.network.security.dao.SecurityGroupRulesDaoImpl;
|
||||
|
|
@ -301,7 +302,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
|
|||
addManager("Template Manager", TemplateManagerImpl.class);
|
||||
addManager("Snapshot Manager", SnapshotManagerImpl.class);
|
||||
addManager("SnapshotScheduler", SnapshotSchedulerImpl.class);
|
||||
addManager("SecurityGroupManager", SecurityGroupManagerImpl.class);
|
||||
addManager("SecurityGroupManager", SecurityGroupManagerImpl2.class);
|
||||
addManager("DomainRouterManager", VirtualNetworkApplianceManagerImpl.class);
|
||||
addManager("EntityManager", EntityManagerImpl.class);
|
||||
addManager("LoadBalancingRulesManager", LoadBalancingRulesManagerImpl.class);
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
|
|||
private long _serverId;
|
||||
|
||||
private int _timeBetweenCleanups = TIME_BETWEEN_CLEANUPS; // seconds
|
||||
private int _numWorkerThreads = WORKER_THREAD_COUNT;
|
||||
protected int _numWorkerThreads = WORKER_THREAD_COUNT;
|
||||
private int _globalWorkLockTimeout = 300; // 5 minutes
|
||||
|
||||
private final GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork");
|
||||
|
|
@ -784,11 +784,17 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
|
|||
|
||||
|
||||
_serverId = ((ManagementServer) ComponentLocator.getComponent(ManagementServer.Name)).getId();
|
||||
|
||||
s_logger.info("SecurityGroupManager: num worker threads=" + _numWorkerThreads +
|
||||
", time between cleanups=" + _timeBetweenCleanups + " global lock timeout=" + _globalWorkLockTimeout);
|
||||
createThreadPools();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void createThreadPools() {
|
||||
_executorPool = Executors.newScheduledThreadPool(_numWorkerThreads, new NamedThreadFactory("NWGRP"));
|
||||
_cleanupExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("NWGRP-Cleanup"));
|
||||
|
||||
s_logger.info("SecurityGroupManager: num worker threads=" + _numWorkerThreads + ", time between cleanups=" + _timeBetweenCleanups);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -23,13 +23,13 @@ import java.util.Set;
|
|||
import java.util.TreeSet;
|
||||
|
||||
import javax.ejb.Local;
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import com.cloud.agent.api.SecurityIngressRulesCmd;
|
||||
import com.cloud.agent.manager.Commands;
|
||||
import com.cloud.exception.AgentUnavailableException;
|
||||
import com.cloud.network.security.SecurityGroupWork.Step;
|
||||
import com.cloud.uservm.UserVm;
|
||||
import com.cloud.utils.db.DB;
|
||||
import com.cloud.utils.db.Transaction;
|
||||
import com.cloud.vm.VirtualMachine.State;
|
||||
|
||||
|
|
@ -44,7 +44,6 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
|
|||
|
||||
|
||||
WorkerThread[] _workers;
|
||||
long _timeToSleep = 10000;
|
||||
|
||||
|
||||
protected class WorkerThread extends Thread {
|
||||
|
|
@ -54,12 +53,27 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
work();
|
||||
while (true) {
|
||||
try{
|
||||
work();
|
||||
} catch (final Throwable th) {
|
||||
s_logger.error("SG Work: Caught this throwable, ", th);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createThreadPools() {
|
||||
_workers = new WorkerThread[_numWorkerThreads];
|
||||
for (int i = 0; i < _workers.length; i++) {
|
||||
_workers[i] = new WorkerThread("SecGrp-Worker-" + i);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@DB
|
||||
public void scheduleRulesetUpdateToHosts(List<Long> affectedVms, boolean updateSeqno, Long delayMs) {
|
||||
if (affectedVms.size() == 0) {
|
||||
return;
|
||||
|
|
@ -77,47 +91,44 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
|
|||
_workQueue.submitWorkForVms(workItems);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
// TODO Auto-generated method stub
|
||||
return super.configure(name, params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
// TODO Auto-generated method stub
|
||||
return super.start();
|
||||
for (final WorkerThread thread : _workers) {
|
||||
thread.start();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void work() {
|
||||
while (true) {
|
||||
try {
|
||||
s_logger.trace("Checking the work queue");
|
||||
List<SecurityGroupWork> workItems = _workQueue.getWork(1);
|
||||
|
||||
for (SecurityGroupWork work: workItems) {
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Processing " + work.getInstanceId());
|
||||
}
|
||||
|
||||
try {
|
||||
VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId());
|
||||
if (rulesetLog == null) {
|
||||
s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId());
|
||||
continue;
|
||||
}
|
||||
work.setLogsequenceNumber(rulesetLog.getLogsequence());
|
||||
sendRulesetUpdates(work);
|
||||
}catch (Exception e) {
|
||||
s_logger.error("Problem during SG work " + work, e);
|
||||
work.setStep(Step.Error);
|
||||
}
|
||||
s_logger.trace("Checking the work queue");
|
||||
List<SecurityGroupWork> workItems;
|
||||
try {
|
||||
workItems = _workQueue.getWork(1);
|
||||
for (SecurityGroupWork work: workItems) {
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Processing " + work.getInstanceId());
|
||||
}
|
||||
} catch (final Throwable th) {
|
||||
s_logger.error("Caught this throwable, ", th);
|
||||
}
|
||||
|
||||
try {
|
||||
VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId());
|
||||
if (rulesetLog == null) {
|
||||
s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId());
|
||||
continue;
|
||||
}
|
||||
work.setLogsequenceNumber(rulesetLog.getLogsequence());
|
||||
sendRulesetUpdates(work);
|
||||
}catch (Exception e) {
|
||||
s_logger.error("Problem during SG work " + work, e);
|
||||
work.setStep(Step.Error);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e1) {
|
||||
s_logger.warn("SG work: caught InterruptException", e1);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
protected void sendRulesetUpdates(SecurityGroupWork work){
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> im
|
|||
boolean success = true;
|
||||
try {
|
||||
stmtInsert = txn.prepareAutoCloseStatement(INSERT_OR_UPDATE);
|
||||
txn.start();
|
||||
for (Long vmId: workItems) {
|
||||
stmtInsert.setLong(1, vmId);
|
||||
stmtInsert.addBatch();
|
||||
|
|
|
|||
Loading…
Reference in New Issue