From cc5e459be0360f7f6d1ea54db2dcffa84ef5679d Mon Sep 17 00:00:00 2001 From: Chiradeep Vittal Date: Tue, 25 Oct 2011 12:10:14 -0700 Subject: [PATCH] bug 11336: limit the number of outstanding messages sent to a host to conserve memory --- .../src/com/cloud/configuration/Config.java | 2 + .../security/SecurityGroupListener.java | 18 +++ .../security/SecurityGroupManagerImpl2.java | 17 ++- .../security/SecurityGroupWorkTracker.java | 103 ++++++++++++++++++ 4 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 server/src/com/cloud/network/security/SecurityGroupWorkTracker.java diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index 190de310710..c629b693c0f 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -84,6 +84,8 @@ public enum Config { SecurityGroupWorkCleanupInterval("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.cleanup.interval", "120", "Time interval (seconds) in which finished work is cleaned up from the work table", null), SecurityGroupWorkerThreads("Network", ManagementServer.class, Integer.class, "network.securitygroups.workers.pool.size", "50", "Number of worker threads processing the security group update work queue", null), SecurityGroupWorkGlobalLockTimeout("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.lock.timeout", "300", "Lock wait timeout (seconds) while updating the security group work queue", null), + SecurityGroupWorkPerAgentMaxQueueSize("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.per.agent.queue.size", "100", "The number of outstanding security group work items that can be queued to a host. If exceeded, work items will get dropped to conserve memory. Security Group Sync will take care of ensuring that the host gets updated eventually", null), + FirewallRuleUiEnabled("Network", ManagementServer.class, Boolean.class, "firewall.rule.ui.enabled", "false", "enable/disable UI that separates firewall rules from NAT/LB rules", null), //VPN diff --git a/server/src/com/cloud/network/security/SecurityGroupListener.java b/server/src/com/cloud/network/security/SecurityGroupListener.java index 4611ac01679..9742bbe03b6 100755 --- a/server/src/com/cloud/network/security/SecurityGroupListener.java +++ b/server/src/com/cloud/network/security/SecurityGroupListener.java @@ -60,6 +60,8 @@ public class SecurityGroupListener implements Listener { SecurityGroupWorkDao _workDao; Map _vmFailureCounts = new ConcurrentHashMap(); + private SecurityGroupWorkTracker _workTracker; + public SecurityGroupListener(SecurityGroupManagerImpl securityGroupManager, AgentManager agentMgr, SecurityGroupWorkDao workDao) { @@ -110,6 +112,8 @@ public class SecurityGroupListener implements Listener { } } commandNum++; + if (_workTracker != null) + _workTracker.processAnswers(agentId, seq, answers); } } @@ -171,6 +175,9 @@ public class SecurityGroupListener implements Listener { //usually hypervisors that do not understand sec group rules. s_logger.debug("Unable to schedule network rules cleanup for host " + host.getId(), e); } + if (_workTracker != null) { + _workTracker.processConnect(host.getId()); + } } } @@ -184,11 +191,22 @@ public class SecurityGroupListener implements Listener { @Override public boolean processDisconnect(long agentId, Status state) { + if (_workTracker != null) { + _workTracker.processDisconnect(agentId); + } return true; } @Override public boolean processTimeout(long agentId, long seq) { + if (_workTracker != null) { + _workTracker.processTimeout(agentId, seq); + } return true; } + + + public void setWorkDispatcher(SecurityGroupWorkTracker workDispatcher) { + this._workTracker = workDispatcher; + } } diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java index 55ca40c96c0..f0539d9e5e6 100644 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java @@ -30,9 +30,11 @@ import javax.naming.ConfigurationException; import com.cloud.agent.api.SecurityIngressRulesCmd; import com.cloud.agent.manager.Commands; +import com.cloud.configuration.Config; import com.cloud.exception.AgentUnavailableException; import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.uservm.UserVm; +import com.cloud.utils.NumbersUtil; import com.cloud.utils.Profiler; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.mgmt.JmxUtil; @@ -46,6 +48,7 @@ import com.cloud.vm.VirtualMachine.State; @Local(value={ SecurityGroupManager.class, SecurityGroupService.class }) public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{ SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue(); + SecurityGroupWorkTracker _workTracker; SecurityManagerMBeanImpl _mBean; WorkerThread[] _workers; @@ -166,9 +169,13 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{ if (s_logger.isTraceEnabled()) { s_logger.trace("SecurityGroupManager v2: found vm, " + userVmId + " state=" + vm.getState()); } - Map> rules = generateRulesForVM(userVmId); Long agentId = vm.getHostId(); if (agentId != null) { + if ( !_workTracker.canSend(agentId)) { + s_logger.trace("SecurityGroupManager v2: not sending ruleset update: too many messages outstanding"); + return; + } + Map> rules = generateRulesForVM(userVmId); SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), null, work.getLogsequenceNumber(), rules); @@ -185,6 +192,7 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{ } } catch (AgentUnavailableException e) { s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")"); + _workTracker.handleException(agentId); } } } else { @@ -261,7 +269,12 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{ } catch (Exception e){ s_logger.error("Failed to register MBean", e); } - return super.configure(name, params); + boolean result = super.configure(name, params); + Map configs = _configDao.getConfiguration("Network", params); + int bufferLength = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkPerAgentMaxQueueSize.key()), 100); + _workTracker = new SecurityGroupWorkTracker(_agentMgr, _answerListener, bufferLength); + _answerListener.setWorkDispatcher(_workTracker); + return result; } public void disableSchedulerForVm(Long vmId, boolean disable) { diff --git a/server/src/com/cloud/network/security/SecurityGroupWorkTracker.java b/server/src/com/cloud/network/security/SecurityGroupWorkTracker.java new file mode 100644 index 00000000000..fbf13bbc2cb --- /dev/null +++ b/server/src/com/cloud/network/security/SecurityGroupWorkTracker.java @@ -0,0 +1,103 @@ +package com.cloud.network.security; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +import com.cloud.agent.AgentManager; +import com.cloud.agent.Listener; +import com.cloud.agent.api.Answer; + +public class SecurityGroupWorkTracker { + protected static final Logger s_logger = Logger.getLogger(SecurityGroupWorkTracker.class); + protected AtomicLong _discardCount = new AtomicLong(0); + AgentManager _agentMgr; + Listener _answerListener; + int _bufferLength; + + Map _unackedMessages = new ConcurrentHashMap(); + + public SecurityGroupWorkTracker(AgentManager agentMgr, Listener answerListener, int bufferLength) { + super(); + assert(bufferLength >= 1) : "SecurityGroupWorkTracker: Cannot have a zero length buffer"; + this._agentMgr = agentMgr; + this._answerListener = answerListener; + this._bufferLength = bufferLength; + } + + public boolean canSend(long agentId) { + int currLength = 0; + synchronized(this) { + Integer outstanding = _unackedMessages.get(agentId); + if (outstanding == null) { + outstanding = new Integer(0); + _unackedMessages.put(new Long(agentId), outstanding); + } + currLength = outstanding.intValue(); + if (currLength + 1 > _bufferLength) { + long discarded = _discardCount.incrementAndGet(); + //drop it on the floor + s_logger.debug("SecurityGroupManager: dropping a message because there are more than " + currLength + + " outstanding messages, total dropped=" + discarded); + return false; + } + _unackedMessages.put(new Long(agentId), ++currLength); + } + return true; + } + + public void handleException(long agentId) { + synchronized(this) { + Integer outstanding = _unackedMessages.get(agentId); + if (outstanding != null && outstanding != 0 ) { + _unackedMessages.put(agentId, --outstanding); + } + } + } + + public void processAnswers(long agentId, long seq, Answer[] answers) { + synchronized(this) { + Integer outstanding = _unackedMessages.get(agentId); + if (outstanding != null && outstanding != 0 ) { + _unackedMessages.put(agentId, --outstanding); + } + } + } + + public void processTimeout(long agentId, long seq) { + synchronized(this) { + Integer outstanding = _unackedMessages.get(agentId); + if (outstanding != null && outstanding != 0 ) { + _unackedMessages.put(agentId, --outstanding); + } + } + } + + public void processDisconnect(long agentId) { + synchronized(this) { + _unackedMessages.put(agentId, 0); + } + } + + public void processConnect(long agentId) { + synchronized(this) { + _unackedMessages.put(agentId, 0); + } + } + + public long getDiscardCount() { + return _discardCount.get(); + } + + public int getUnackedCount(long agentId) { + Integer outstanding = _unackedMessages.get(agentId); + if (outstanding == null) { + return 0; + } + return outstanding.intValue(); + } + + +}