diff --git a/core/src/com/cloud/network/security/SecurityGroupWork.java b/core/src/com/cloud/network/security/SecurityGroupWork.java new file mode 100644 index 00000000000..c2e096c6450 --- /dev/null +++ b/core/src/com/cloud/network/security/SecurityGroupWork.java @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.network.security; + +/** + * Work related to security groups for a vm + * + */ +public interface SecurityGroupWork { + + public enum Step { + Scheduled, + Processing, + Done, + Error + } + + Long getInstanceId(); + Long getLogsequenceNumber(); + Step getStep(); + void setStep(Step step); + public abstract void setLogsequenceNumber(Long logsequenceNumber); + +} diff --git a/core/src/com/cloud/network/security/SecurityGroupWorkVO.java b/core/src/com/cloud/network/security/SecurityGroupWorkVO.java index e95031e8aac..9171bbbf729 100644 --- a/core/src/com/cloud/network/security/SecurityGroupWorkVO.java +++ b/core/src/com/cloud/network/security/SecurityGroupWorkVO.java @@ -34,13 +34,8 @@ import com.cloud.utils.db.GenericDao; @Entity @Table(name="op_nwgrp_work") -public class SecurityGroupWorkVO { - public enum Step { - Scheduled, - Processing, - Done, - Error - } +public class SecurityGroupWorkVO implements SecurityGroupWork{ + @Id @GeneratedValue(strategy=GenerationType.IDENTITY) @@ -132,7 +127,8 @@ public class SecurityGroupWorkVO { return logsequenceNumber; } - public void setLogsequenceNumber(Long logsequenceNumber) { + @Override + public void setLogsequenceNumber(Long logsequenceNumber) { this.logsequenceNumber = logsequenceNumber; } diff --git a/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java b/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java new file mode 100644 index 00000000000..42cd42f430a --- /dev/null +++ b/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java @@ -0,0 +1,185 @@ +/** + * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.network.security; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; + +import com.cloud.network.security.SecurityGroupWork.Step; + + +/** + * Security Group Work Queue that is not shared with other management servers + * + */ +public class LocalSecurityGroupWorkQueue implements SecurityGroupWorkQueue { + protected static Logger s_logger = Logger.getLogger(LocalSecurityGroupWorkQueue.class); + + protected TreeSet _currentWork = new TreeSet(); + private final ReentrantLock _lock = new ReentrantLock(); + private final Condition _notEmpty = _lock.newCondition(); + private final AtomicInteger _count = new AtomicInteger(0); + + public static class LocalSecurityGroupWork implements SecurityGroupWork, Comparable { + Long _logSequenceNumber; + Long _instanceId; + Step _step; + + public LocalSecurityGroupWork(Long instanceId, Long logSequence, Step step){ + this._instanceId = instanceId; + this._logSequenceNumber = logSequence; + this._step = step; + } + + @Override + public Long getInstanceId() { + return _instanceId; + } + + @Override + public Long getLogsequenceNumber() { + return _logSequenceNumber; + } + + @Override + public Step getStep() { + return _step; + } + + @Override + public void setStep(Step step) { + this._step = step; + } + + @Override + public void setLogsequenceNumber(Long logsequenceNumber) { + this._logSequenceNumber = logsequenceNumber; + + } + + @Override + public int compareTo(LocalSecurityGroupWork o) { + return this._instanceId.compareTo(o.getInstanceId()); + } + + } + + + @Override + public void submitWorkForVm(long vmId, long sequenceNumber) { + _lock.lock(); + try { + SecurityGroupWork work = new LocalSecurityGroupWork(vmId, sequenceNumber, Step.Scheduled); + boolean added = _currentWork.add(work); + if (added) + _count.incrementAndGet(); + } finally { + _lock.unlock(); + } + signalNotEmpty(); + + } + + + @Override + public int submitWorkForVms(Set vmIds) { + _lock.lock(); + int newWork = _count.get(); + try { + for (Long vmId: vmIds) { + SecurityGroupWork work = new LocalSecurityGroupWork(vmId, null, SecurityGroupWork.Step.Scheduled); + boolean added = _currentWork.add(work); + if (added) + _count.incrementAndGet(); + } + } finally { + newWork = _count.get() - newWork; + _lock.unlock(); + } + signalNotEmpty(); + return newWork; + } + + + @Override + public List getWork(int numberOfWorkItems) { + List work = new ArrayList(numberOfWorkItems); + _lock.lock(); + int i = 0; + try { + while (_count.get() == 0) { + _notEmpty.await(); + } + int n = Math.min(numberOfWorkItems, _count.get()); + while (i < n ) { + SecurityGroupWork w = _currentWork.first(); + w.setStep(Step.Processing); + work.add(w); + _currentWork.remove(w); + ++i; + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + int c = _count.addAndGet(-i); + if (c > 0) + _notEmpty.signal(); + _lock.unlock(); + } + return work; + + } + + private void signalNotEmpty() { + _lock.lock(); + try { + _notEmpty.signal(); + } finally { + _lock.unlock(); + } + } + + + @Override + public int size() { + return _count.get(); + } + + + @Override + public void clear() { + _lock.lock(); + try { + _currentWork.clear(); + _count.set(0); + } finally { + _lock.unlock(); + } + + } + + +} diff --git a/server/src/com/cloud/network/security/SecurityGroupListener.java b/server/src/com/cloud/network/security/SecurityGroupListener.java index 46b50657bfe..6136db13f4d 100755 --- a/server/src/com/cloud/network/security/SecurityGroupListener.java +++ b/server/src/com/cloud/network/security/SecurityGroupListener.java @@ -37,7 +37,7 @@ import com.cloud.agent.manager.Commands; import com.cloud.exception.AgentUnavailableException; import com.cloud.host.HostVO; import com.cloud.host.Status; -import com.cloud.network.security.SecurityGroupWorkVO.Step; +import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.network.security.dao.SecurityGroupWorkDao; /** diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java index cd43a6c10d9..4e8aadf3d9d 100755 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java @@ -42,17 +42,13 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.api.NetworkRulesSystemVmCommand; -import com.cloud.agent.api.SecurityEgressRulesCmd; -import com.cloud.agent.api.SecurityEgressRulesCmd.EgressIpPortAndProto; import com.cloud.agent.api.SecurityIngressRulesCmd; import com.cloud.agent.api.SecurityIngressRulesCmd.IpPortAndProto; import com.cloud.agent.manager.Commands; -import com.cloud.api.commands.AuthorizeSecurityGroupEgressCmd; import com.cloud.api.commands.AuthorizeSecurityGroupIngressCmd; import com.cloud.api.commands.CreateSecurityGroupCmd; import com.cloud.api.commands.DeleteSecurityGroupCmd; import com.cloud.api.commands.ListSecurityGroupsCmd; -import com.cloud.api.commands.RevokeSecurityGroupEgressCmd; import com.cloud.api.commands.RevokeSecurityGroupIngressCmd; import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; @@ -68,8 +64,7 @@ import com.cloud.exception.PermissionDeniedException; import com.cloud.exception.ResourceInUseException; import com.cloud.hypervisor.Hypervisor.HypervisorType; import com.cloud.network.NetworkManager; -import com.cloud.network.security.SecurityGroupWorkVO.Step; -import com.cloud.network.security.dao.EgressRuleDao; +import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.network.security.dao.IngressRuleDao; import com.cloud.network.security.dao.SecurityGroupDao; import com.cloud.network.security.dao.SecurityGroupRulesDao; @@ -79,6 +74,7 @@ import com.cloud.network.security.dao.VmRulesetLogDao; import com.cloud.server.ManagementServer; import com.cloud.user.Account; import com.cloud.user.AccountManager; +import com.cloud.user.AccountVO; import com.cloud.user.UserContext; import com.cloud.user.dao.AccountDao; import com.cloud.uservm.UserVm; @@ -110,6 +106,8 @@ import com.cloud.vm.VirtualMachineManager; import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.VMInstanceDao; +import edu.emory.mathcs.backport.java.util.Collections; + @Local(value = { SecurityGroupManager.class, SecurityGroupService.class }) public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityGroupService, Manager, StateListener { public static final Logger s_logger = Logger.getLogger(SecurityGroupManagerImpl.class); @@ -119,8 +117,6 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG @Inject IngressRuleDao _ingressRuleDao; @Inject - EgressRuleDao _egressRuleDao; - @Inject SecurityGroupVMMapDao _securityGroupVMMapDao; @Inject SecurityGroupRulesDao _securityGroupRulesDao; @@ -323,41 +319,8 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } } - protected Map> generateEgressRulesForVM(Long userVmId) { - Map> allowed = new TreeMap>(); - - List groupsForVm = _securityGroupVMMapDao.listByInstanceId(userVmId); - for (SecurityGroupVMMapVO mapVO : groupsForVm) { - List rules = _egressRuleDao.listBySecurityGroupId(mapVO.getSecurityGroupId()); - for (EgressRuleVO rule : rules) { - PortAndProto portAndProto = new PortAndProto(rule.getProtocol(), rule.getStartPort(), rule.getEndPort()); - Set cidrs = allowed.get(portAndProto); - if (cidrs == null) { - cidrs = new TreeSet(new CidrComparator()); - } - if (rule.getAllowedNetworkId() != null) { - List allowedInstances = _securityGroupVMMapDao.listBySecurityGroup(rule.getAllowedNetworkId(), State.Running); - for (SecurityGroupVMMapVO ngmapVO : allowedInstances) { - Nic defaultNic = _networkMgr.getDefaultNic(ngmapVO.getInstanceId()); - if (defaultNic != null) { - String cidr = defaultNic.getIp4Address(); - cidr = cidr + "/32"; - cidrs.add(cidr); - } - } - } else if (rule.getAllowedDestinationIpCidr() != null) { - cidrs.add(rule.getAllowedDestinationIpCidr()); - } - if (cidrs.size() > 0) { - allowed.put(portAndProto, cidrs); - } - } - } - - return allowed; - } - protected Map> generateIngressRulesForVM(Long userVmId) { + protected Map> generateRulesForVM(Long userVmId) { Map> allowed = new TreeMap>(); @@ -392,7 +355,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG return allowed; } - private String generateRulesetSignature(Map> allowed) { + protected String generateRulesetSignature(Map> allowed) { String ruleset = allowed.toString(); return DigestUtils.md5Hex(ruleset); } @@ -400,90 +363,79 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG protected void handleVmStarted(VMInstanceVO vm) { if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId())) return; - Set affectedVms = getAffectedVmsForVmStart(vm); + List affectedVms = getAffectedVmsForVmStart(vm); scheduleRulesetUpdateToHosts(affectedVms, true, null); } @DB - public void scheduleRulesetUpdateToHosts(Set affectedVms, boolean updateSeqno, Long delayMs) { + public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) { + if (affectedVms.size() == 0) { + return; + } + if (delayMs == null) { delayMs = new Long(100l); } + Collections.sort(affectedVms); if (s_logger.isTraceEnabled()) { s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms"); } - if (affectedVms.size() == 0) { + boolean locked = _workLock.lock(_globalWorkLockTimeout); + if (!locked) { + s_logger.warn("Security Group Mgr: failed to acquire global work lock"); return; } - boolean locked = _workLock.lock(_globalWorkLockTimeout); - if (locked) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: acquired global work lock"); - } - try { - for (Long vmId : affectedVms) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId); - } - VmRulesetLogVO log = null; - SecurityGroupWorkVO work = null; - //UserVm vm = null; - Transaction txn = null; - try { - txn = Transaction.currentTxn(); - txn.start(); - //vm = _userVMDao.acquireInLockTable(vmId); - //if (vm == null) { - //s_logger.warn("Failed to acquire lock on vm id " + vmId); - //continue; - //} - log = _rulesetLogDao.findByVmId(vmId); - if (log == null) { - log = new VmRulesetLogVO(vmId); - log = _rulesetLogDao.persist(log); - } - - if (log != null && updateSeqno) { - log.incrLogsequence(); - _rulesetLogDao.update(log.getId(), log); - } - work = _workDao.findByVmIdStep(vmId, Step.Scheduled); - if (work == null) { - work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); - work = _workDao.persist(work); - if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: created new work item for " + vmId); - } - } - - work.setLogsequenceNumber(log.getLogsequence()); - _workDao.update(work.getId(), work); - - } finally { - // if (vm != null) { - // _userVMDao.releaseFromLockTable(vmId); - // } - if (txn != null) - txn.commit(); - } - - _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); - } - } finally { - _workLock.unlock(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: acquired global work lock"); + } + Transaction txn = Transaction.currentTxn(); + try { + txn.start(); + for (Long vmId : affectedVms) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: released global work lock"); + s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId); } + VmRulesetLogVO log = null; + SecurityGroupWorkVO work = null; + + log = _rulesetLogDao.findByVmId(vmId); + if (log == null) { + log = new VmRulesetLogVO(vmId); + log = _rulesetLogDao.persist(log); + } + + if (log != null && updateSeqno) { + log.incrLogsequence(); + _rulesetLogDao.update(log.getId(), log); + } + work = _workDao.findByVmIdStep(vmId, Step.Scheduled); + if (work == null) { + work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWork.Step.Scheduled, null); + work = _workDao.persist(work); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: created new work item for " + vmId + "; id = " + work.getId()); + } + } + + work.setLogsequenceNumber(log.getLogsequence()); + _workDao.update(work.getId(), work); + } + txn.commit(); + for (Long vmId : affectedVms) { + _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); + } + } finally { + _workLock.unlock(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: released global work lock"); } - } else { - s_logger.warn("Security Group Mgr: failed to acquire global work lock"); } } - protected Set getAffectedVmsForVmStart(VMInstanceVO vm) { - Set affectedVms = new HashSet(); + protected List getAffectedVmsForVmStart(VMInstanceVO vm) { + List affectedVms = new ArrayList(); affectedVms.add(vm.getId()); List groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId()); // For each group, find the ingress rules that allow the group @@ -495,8 +447,8 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG return affectedVms; } - protected Set getAffectedVmsForVmStop(VMInstanceVO vm) { - Set affectedVms = new HashSet(); + protected List getAffectedVmsForVmStop(VMInstanceVO vm) { + List affectedVms = new ArrayList(); List groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId()); // For each group, find the ingress rules that allow the group for (SecurityGroupVMMapVO mapVO : groupsForVm) {// FIXME: use custom sql in the dao @@ -507,9 +459,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG return affectedVms; } - protected Set getAffectedVmsForIngressRules(List allowingRules) { + protected List getAffectedVmsForIngressRules(List allowingRules) { Set distinctGroups = new HashSet(); - Set affectedVms = new HashSet(); + List affectedVms = new ArrayList(); for (IngressRuleVO allowingRule : allowingRules) { distinctGroups.add(allowingRule.getSecurityGroupId()); @@ -521,7 +473,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG return affectedVms; } - protected SecurityIngressRulesCmd generateIngressRulesetCmd(String vmName, String guestIp, String guestMac, Long vmId, String signature, long seqnum, Map> rules) { + protected SecurityIngressRulesCmd generateRulesetCmd(String vmName, String guestIp, String guestMac, Long vmId, String signature, long seqnum, Map> rules) { List result = new ArrayList(); for (PortAndProto pAp : rules.keySet()) { Set cidrs = rules.get(pAp); @@ -532,23 +484,11 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } return new SecurityIngressRulesCmd(guestIp, guestMac, vmName, vmId, signature, seqnum, result.toArray(new IpPortAndProto[result.size()])); } - - protected SecurityEgressRulesCmd generateEgressRulesetCmd(String vmName, String guestIp, String guestMac, Long vmId, String signature, long seqnum, Map> rules) { - List result = new ArrayList(); - for (PortAndProto pAp : rules.keySet()) { - Set cidrs = rules.get(pAp); - if (cidrs.size() > 0) { - EgressIpPortAndProto ipPortAndProto = new SecurityEgressRulesCmd.EgressIpPortAndProto(pAp.getProto(), pAp.getStartPort(), pAp.getEndPort(), cidrs.toArray(new String[cidrs.size()])); - result.add(ipPortAndProto); - } - } - return new SecurityEgressRulesCmd(guestIp, guestMac, vmName, vmId, signature, seqnum, result.toArray(new EgressIpPortAndProto[result.size()])); - } - + protected void handleVmStopped(VMInstanceVO vm) { if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId())) return; - Set affectedVms = getAffectedVmsForVmStop(vm); + List affectedVms = getAffectedVmsForVmStop(vm); scheduleRulesetUpdateToHosts(affectedVms, true, null); } @@ -568,7 +508,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } } else { - Set affectedVms = new HashSet(); + List affectedVms = new ArrayList(); affectedVms.add(vm.getId()); scheduleRulesetUpdateToHosts(affectedVms, true, null); } @@ -738,7 +678,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG s_logger.debug("Added " + newRules.size() + " rules to security group " + securityGroup.getName()); } txn.commit(); - final Set affectedVms = new HashSet(); + final ArrayList affectedVms = new ArrayList(); affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(securityGroup.getId())); scheduleRulesetUpdateToHosts(affectedVms, true, null); return newRules; @@ -784,7 +724,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG _ingressRuleDao.remove(id); s_logger.debug("revokeSecurityGroupIngress succeeded for ingress rule id: " + id); - final Set affectedVms = new HashSet(); + final ArrayList affectedVms = new ArrayList(); affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(groupHandle.getId())); scheduleRulesetUpdateToHosts(affectedVms, true, null); @@ -800,232 +740,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG } } - @Override - @DB - @SuppressWarnings("rawtypes") - public List authorizeSecurityGroupEgress(AuthorizeSecurityGroupEgressCmd cmd) { - Long securityGroupId = cmd.getSecurityGroupId(); - String protocol = cmd.getProtocol(); - Integer startPort = cmd.getStartPort(); - Integer endPort = cmd.getEndPort(); - Integer icmpType = cmd.getIcmpType(); - Integer icmpCode = cmd.getIcmpCode(); - List cidrList = cmd.getCidrList(); - Map groupList = cmd.getUserSecurityGroupList(); - Integer startPortOrType = null; - Integer endPortOrCode = null; - // Validate parameters - SecurityGroup securityGroup = _securityGroupDao.findById(securityGroupId); - if (securityGroup == null) { - throw new InvalidParameterValueException("Unable to find security group by id " + securityGroupId); - } - - if (cidrList == null && groupList == null) { - throw new InvalidParameterValueException("At least one cidr or at least one security group needs to be specified"); - } - - Account caller = UserContext.current().getCaller(); - Account owner = _accountMgr.getAccount(securityGroup.getAccountId()); - - if (owner == null) { - throw new InvalidParameterValueException("Unable to find security group owner by id=" + securityGroup.getAccountId()); - } - - // Verify permissions - _accountMgr.checkAccess(caller, null, securityGroup); - Long domainId = owner.getDomainId(); - - if (protocol == null) { - protocol = NetUtils.ALL_PROTO; - } - - if (!NetUtils.isValidSecurityGroupProto(protocol)) { - throw new InvalidParameterValueException("Invalid protocol " + protocol); - } - if ("icmp".equalsIgnoreCase(protocol)) { - if ((icmpType == null) || (icmpCode == null)) { - throw new InvalidParameterValueException("Invalid ICMP type/code specified, icmpType = " + icmpType + ", icmpCode = " + icmpCode); - } - if (icmpType == -1 && icmpCode != -1) { - throw new InvalidParameterValueException("Invalid icmp type range"); - } - if (icmpCode > 255) { - throw new InvalidParameterValueException("Invalid icmp code "); - } - startPortOrType = icmpType; - endPortOrCode = icmpCode; - } else if (protocol.equals(NetUtils.ALL_PROTO)) { - if ((startPort != null) || (endPort != null)) { - throw new InvalidParameterValueException("Cannot specify startPort or endPort without specifying protocol"); - } - startPortOrType = 0; - endPortOrCode = 0; - } else { - if ((startPort == null) || (endPort == null)) { - throw new InvalidParameterValueException("Invalid port range specified, startPort = " + startPort + ", endPort = " + endPort); - } - if (startPort == 0 && endPort == 0) { - endPort = 65535; - } - if (startPort > endPort) { - throw new InvalidParameterValueException("Invalid port range " + startPort + ":" + endPort); - } - if (startPort > 65535 || endPort > 65535 || startPort < -1 || endPort < -1) { - throw new InvalidParameterValueException("Invalid port numbers " + startPort + ":" + endPort); - } - - if (startPort < 0 || endPort < 0) { - throw new InvalidParameterValueException("Invalid port range " + startPort + ":" + endPort); - } - startPortOrType = startPort; - endPortOrCode = endPort; - } - - protocol = protocol.toLowerCase(); - - List authorizedGroups = new ArrayList(); - if (groupList != null) { - Collection userGroupCollection = groupList.values(); - Iterator iter = userGroupCollection.iterator(); - while (iter.hasNext()) { - HashMap userGroup = (HashMap) iter.next(); - String group = (String) userGroup.get("group"); - String authorizedAccountName = (String) userGroup.get("account"); - - if ((group == null) || (authorizedAccountName == null)) { - throw new InvalidParameterValueException( - "Invalid user group specified, fields 'group' and 'account' cannot be null, please specify groups in the form: userGroupList[0].group=XXX&userGroupList[0].account=YYY"); - } - - Account authorizedAccount = _accountDao.findActiveAccount(authorizedAccountName, domainId); - if (authorizedAccount == null) { - throw new InvalidParameterValueException("Nonexistent account: " + authorizedAccountName + " when trying to authorize ingress for " + securityGroupId + ":" + protocol + ":" - + startPortOrType + ":" + endPortOrCode); - } - - SecurityGroupVO groupVO = _securityGroupDao.findByAccountAndName(authorizedAccount.getId(), group); - if (groupVO == null) { - throw new InvalidParameterValueException("Nonexistent group " + group + " for account " + authorizedAccountName + "/" + domainId + " is given, unable to authorize ingress."); - } - - // Check permissions - _accountMgr.checkAccess(caller, null, groupVO); - - authorizedGroups.add(groupVO); - } - } - - final Transaction txn = Transaction.currentTxn(); - final Set authorizedGroups2 = new TreeSet(new SecurityGroupVOComparator()); - - authorizedGroups2.addAll(authorizedGroups); // Ensure we don't re-lock the same row - txn.start(); - - // Prevents other threads/management servers from creating duplicate ingress rules - securityGroup = _securityGroupDao.acquireInLockTable(securityGroupId); - if (securityGroup == null) { - s_logger.warn("Could not acquire lock on network security group: id= " + securityGroupId); - return null; - } - List newRules = new ArrayList(); - try { - for (final SecurityGroupVO ngVO : authorizedGroups2) { - final Long ngId = ngVO.getId(); - // Don't delete the referenced group from under us - if (ngVO.getId() != securityGroup.getId()) { - final SecurityGroupVO tmpGrp = _securityGroupDao.lockRow(ngId, false); - if (tmpGrp == null) { - s_logger.warn("Failed to acquire lock on security group: " + ngId); - txn.rollback(); - return null; - } - } - EgressRuleVO egressRule = _egressRuleDao.findByProtoPortsAndAllowedGroupId(securityGroup.getId(), protocol, startPortOrType, endPortOrCode, ngVO.getId()); - if (egressRule != null) { - continue; // rule already exists. - } - egressRule = new EgressRuleVO(securityGroup.getId(), startPortOrType, endPortOrCode, protocol, ngVO.getId()); - egressRule = _egressRuleDao.persist(egressRule); - newRules.add(egressRule); - } - if (cidrList != null) { - for (String cidr : cidrList) { - EgressRuleVO egressRule = _egressRuleDao.findByProtoPortsAndCidr(securityGroup.getId(), protocol, startPortOrType, endPortOrCode, cidr); - if (egressRule != null) { - continue; - } - egressRule = new EgressRuleVO(securityGroup.getId(), startPortOrType, endPortOrCode, protocol, cidr); - egressRule = _egressRuleDao.persist(egressRule); - newRules.add(egressRule); - } - } - if (s_logger.isDebugEnabled()) { - s_logger.debug("Added " + newRules.size() + " rules to security group " + securityGroup.getName()); - } - txn.commit(); - final Set affectedVms = new HashSet(); - affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(securityGroup.getId())); - scheduleRulesetUpdateToHosts(affectedVms, true, null); - return newRules; - } catch (Exception e) { - s_logger.warn("Exception caught when adding ingress rules ", e); - throw new CloudRuntimeException("Exception caught when adding ingress rules", e); - } finally { - if (securityGroup != null) { - _securityGroupDao.releaseFromLockTable(securityGroup.getId()); - } - } - } - - @Override - @DB - public boolean revokeSecurityGroupEgress(RevokeSecurityGroupEgressCmd cmd) { - // input validation - Account caller = UserContext.current().getCaller(); - Long id = cmd.getId(); - - IngressRuleVO rule = _ingressRuleDao.findById(id); - if (rule == null) { - s_logger.debug("Unable to find ingress rule with id " + id); - throw new InvalidParameterValueException("Unable to find ingress rule with id " + id); - } - - // Check permissions - SecurityGroup securityGroup = _securityGroupDao.findById(rule.getSecurityGroupId()); - _accountMgr.checkAccess(caller, null, securityGroup); - - SecurityGroupVO groupHandle = null; - final Transaction txn = Transaction.currentTxn(); - - try { - txn.start(); - // acquire lock on parent group (preserving this logic) - groupHandle = _securityGroupDao.acquireInLockTable(rule.getSecurityGroupId()); - if (groupHandle == null) { - s_logger.warn("Could not acquire lock on security group id: " + rule.getSecurityGroupId()); - return false; - } - - _ingressRuleDao.remove(id); - s_logger.debug("revokeSecurityGroupIngress succeeded for ingress rule id: " + id); - - final Set affectedVms = new HashSet(); - affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(groupHandle.getId())); - scheduleRulesetUpdateToHosts(affectedVms, true, null); - - return true; - } catch (Exception e) { - s_logger.warn("Exception caught when deleting ingress rules ", e); - throw new CloudRuntimeException("Exception caught when deleting ingress rules", e); - } finally { - if (groupHandle != null) { - _securityGroupDao.releaseFromLockTable(groupHandle.getId()); - } - txn.commit(); - } - - } @Override @ActionEvent(eventType = EventTypes.EVENT_SECURITY_GROUP_CREATE, eventDescription = "creating security group") public SecurityGroupVO createSecurityGroup(CreateSecurityGroupCmd cmd) throws PermissionDeniedException, InvalidParameterValueException { @@ -1121,9 +836,10 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (s_logger.isDebugEnabled()) { s_logger.debug("Security Group work: found a job in done state, rescheduling for vm: " + userVmId); } - Set affectedVms = new HashSet(); + ArrayList affectedVms = new ArrayList(); affectedVms.add(userVmId); scheduleRulesetUpdateToHosts(affectedVms, true, _timeBetweenCleanups*1000l); + return; } UserVm vm = null; Long seqnum = null; @@ -1153,23 +869,11 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG seqnum = log.getLogsequence(); if (vm != null && vm.getState() == State.Running) { - Map> ingressRules = generateIngressRulesForVM(userVmId); - Map> egressRules = generateEgressRulesForVM(userVmId); + Map> rules = generateRulesForVM(userVmId); agentId = vm.getHostId(); if (agentId != null) { - _rulesetLogDao.findByVmId(work.getInstanceId()); - SecurityIngressRulesCmd ingressCmd = generateIngressRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(ingressRules), seqnum, - ingressRules); - Commands ingressCmds = new Commands(ingressCmd); - try { - _agentMgr.send(agentId, ingressCmds, _answerListener); - } catch (AgentUnavailableException e) { - s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")"); - _workDao.updateStep(work.getInstanceId(), seqnum, Step.Done); - } - - SecurityEgressRulesCmd cmd = generateEgressRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(egressRules), seqnum, - egressRules); + SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(rules), seqnum, + rules); Commands cmds = new Commands(cmd); try { _agentMgr.send(agentId, cmds, _answerListener); @@ -1407,7 +1111,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG @Override public void fullSync(long agentId, HashMap> newGroupStates) { - Set affectedVms = new HashSet(); + ArrayList affectedVms = new ArrayList(); for (String vmName : newGroupStates.keySet()) { Long vmId = newGroupStates.get(vmName).first(); Long seqno = newGroupStates.get(vmName).second(); @@ -1438,7 +1142,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG List unfinished = _workDao.findUnfinishedWork(before); if (unfinished.size() > 0) { s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString()); - Set affectedVms = new HashSet(); + ArrayList affectedVms = new ArrayList(); for (SecurityGroupWorkVO work : unfinished) { affectedVms.add(work.getInstanceId()); work.setStep(Step.Error); diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java new file mode 100644 index 00000000000..59bdd57c1fa --- /dev/null +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.network.security; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import javax.naming.ConfigurationException; + +import com.cloud.agent.api.SecurityIngressRulesCmd; +import com.cloud.agent.manager.Commands; +import com.cloud.exception.AgentUnavailableException; +import com.cloud.network.security.SecurityGroupWork.Step; +import com.cloud.uservm.UserVm; +import com.cloud.utils.db.Transaction; +import com.cloud.vm.VirtualMachine.State; + +/** + * Same as the base class -- except it uses the abstracted security group work queue + * + */ +public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl { + + SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue(); + + + WorkerThread[] _workers; + long _timeToSleep = 10000; + + + protected class WorkerThread extends Thread { + public WorkerThread(String name) { + super(name); + } + + @Override + public void run() { + work(); + } + + } + + @Override + public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) { + if (affectedVms.size() == 0) { + return; + } + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr2: scheduling ruleset updates for " + affectedVms.size() + " vms"); + } + Set workItems = new TreeSet(); + workItems.addAll(affectedVms); + + Transaction txn = Transaction.currentTxn(); + txn.start(); + _rulesetLogDao.createOrUpdate(workItems); + txn.commit(); + _workQueue.submitWorkForVms(workItems); + notifyAll(); + + } + + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + // TODO Auto-generated method stub + return super.configure(name, params); + } + + @Override + public boolean start() { + // TODO Auto-generated method stub + return super.start(); + } + + @Override + public void work() { + while (true) { + try { + s_logger.trace("Checking the work queue"); + List workItems = _workQueue.getWork(1); + + for (SecurityGroupWork work: workItems) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Processing " + work.getInstanceId()); + } + + try { + VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId()); + if (rulesetLog == null) { + s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId()); + continue; + } + work.setLogsequenceNumber(rulesetLog.getLogsequence()); + sendRulesetUpdates(work); + }catch (Exception e) { + s_logger.error("Problem during SG work " + work, e); + work.setStep(Step.Error); + } + } + } catch (final Throwable th) { + s_logger.error("Caught this throwable, ", th); + } + } + } + + protected void sendRulesetUpdates(SecurityGroupWork work){ + Long userVmId = work.getInstanceId(); + UserVm vm = _userVMDao.findById(userVmId); + + if (vm != null && vm.getState() == State.Running) { + Map> rules = generateRulesForVM(userVmId); + Long agentId = vm.getHostId(); + if (agentId != null) { + SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), + vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(rules), + work.getLogsequenceNumber(), rules); + Commands cmds = new Commands(cmd); + try { + _agentMgr.send(agentId, cmds, _answerListener); + } catch (AgentUnavailableException e) { + s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")"); + } + } + } else { + if (s_logger.isTraceEnabled()) { + if (vm != null) + s_logger.trace("No rules sent to vm " + vm + "state=" + vm.getState()); + else + s_logger.trace("Could not find vm: No rules sent to vm " + userVmId ); + } + } + } + + @Override + public void cleanupFinishedWork() { + //no-op + } + + +} diff --git a/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java b/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java new file mode 100644 index 00000000000..0fb0773d558 --- /dev/null +++ b/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.network.security; + +import java.util.List; +import java.util.Set; + + +/** + * Security Group Work queue + * standard producer / consumer interface + * + */ +public interface SecurityGroupWorkQueue { + + void submitWorkForVm(long vmId, long sequenceNumber); + + int submitWorkForVms(Set vmIds); + + List getWork(int numberOfWorkItems); + + int size(); + + void clear(); +} diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java index 3e5819a3263..43f7c34f3eb 100644 --- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java +++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java @@ -21,12 +21,13 @@ package com.cloud.network.security.dao; import java.util.Date; import java.util.List; +import com.cloud.network.security.SecurityGroupWork; import com.cloud.network.security.SecurityGroupWorkVO; -import com.cloud.network.security.SecurityGroupWorkVO.Step; +import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.utils.db.GenericDao; public interface SecurityGroupWorkDao extends GenericDao { - SecurityGroupWorkVO findByVmId(long vmId, boolean taken); + SecurityGroupWork findByVmId(long vmId, boolean taken); SecurityGroupWorkVO findByVmIdStep(long vmId, Step step); diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java index eea2da088f1..f40beb6df1b 100644 --- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java +++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java @@ -26,8 +26,9 @@ import javax.ejb.Local; import org.apache.log4j.Logger; import com.cloud.ha.HaWorkVO; +import com.cloud.network.security.SecurityGroupWork; import com.cloud.network.security.SecurityGroupWorkVO; -import com.cloud.network.security.SecurityGroupWorkVO.Step; +import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.utils.db.DB; import com.cloud.utils.db.Filter; import com.cloud.utils.db.GenericDaoBase; @@ -91,7 +92,7 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase sc = taken?VmIdTakenSearch.create():VmIdUnTakenSearch.create(); sc.setParameters("vmId", vmId); return findOneIncludingRemovedBy(sc); @@ -130,9 +131,9 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase { VmRulesetLogVO findByVmId(long vmId); + + void createOrUpdate(Set workItems); } diff --git a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java index cb536c2c421..1394646130a 100644 --- a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java +++ b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java @@ -18,17 +18,27 @@ package com.cloud.network.security.dao; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Set; + import javax.ejb.Local; +import org.apache.log4j.Logger; + import com.cloud.network.security.VmRulesetLogVO; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; @Local(value={VmRulesetLogDao.class}) -public class VmRulesetLogDaoImpl extends GenericDaoBase implements VmRulesetLogDao { +public class VmRulesetLogDaoImpl extends GenericDaoBase implements VmRulesetLogDao { + protected static Logger s_logger = Logger.getLogger(VmRulesetLogDaoImpl.class); private SearchBuilder VmIdSearch; - + private String INSERT_OR_UPDATE = "INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) " + + " VALUES(?, now(), 1) ON DUPLICATE KEY UPDATE logsequence=logsequence+1"; + protected VmRulesetLogDaoImpl() { VmIdSearch = createSearchBuilder(); @@ -43,6 +53,39 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase im SearchCriteria sc = VmIdSearch.create(); sc.setParameters("vmId", vmId); return findOneIncludingRemovedBy(sc); + } + + @Override + public void createOrUpdate(Set workItems) { + Transaction txn = Transaction.currentTxn(); + PreparedStatement stmtInsert = null; + int [] queryResult = null; + boolean success = true; + try { + stmtInsert = txn.prepareAutoCloseStatement(INSERT_OR_UPDATE); + for (Long vmId: workItems) { + stmtInsert.setLong(1, vmId); + stmtInsert.addBatch(); + } + queryResult = stmtInsert.executeBatch(); + + txn.commit(); + if (s_logger.isTraceEnabled()) + s_logger.trace("Updated or inserted " + workItems.size() + " log items"); + } catch (SQLException e) { + s_logger.warn("Failed to execute batch update statement for ruleset log"); + success = false; + } + if (!success && queryResult != null) { + Long [] arrayItems = new Long[workItems.size()]; + workItems.toArray(arrayItems); + for (int i=0; i < queryResult.length; i++) { + if (queryResult[i] < 0 ) { + s_logger.debug("Batch query update failed for vm " + arrayItems[i]); + } + } + } + return; } diff --git a/server/test/com/cloud/network/security/SecurityGroupQueueTest.java b/server/test/com/cloud/network/security/SecurityGroupQueueTest.java new file mode 100644 index 00000000000..8cf0da187f7 --- /dev/null +++ b/server/test/com/cloud/network/security/SecurityGroupQueueTest.java @@ -0,0 +1,165 @@ +package com.cloud.network.security; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import junit.framework.TestCase; + +public class SecurityGroupQueueTest extends TestCase { + public final static SecurityGroupWorkQueue queue = new LocalSecurityGroupWorkQueue(); + + + public static class Producer implements Runnable { + int _maxVmId = 0; + int _newWorkQueued=0; + Set vmIds = new HashSet(); + + public Producer(int maxVmId) { + this._maxVmId = maxVmId; + for (long i=1; i <= _maxVmId; i++) { + vmIds.add(i); + } + } + + public void run() { + _newWorkQueued = queue.submitWorkForVms(vmIds); + } + + public int getNewWork() { + return _newWorkQueued; + } + + public int getTotalWork() { + return _maxVmId; + } + } + + public static class Consumer implements Runnable { + private int _numJobsToDequeue = 0; + private int _numJobsDequeued = 0; + + public Consumer(int numJobsToDequeu) { + this._numJobsToDequeue = numJobsToDequeu; + } + + public void run() { + List result = queue.getWork(_numJobsToDequeue); + this._numJobsDequeued = result.size(); + } + + int getNumJobsToDequeue() { + return _numJobsToDequeue; + } + + int getNumJobsDequeued() { + return _numJobsDequeued; + } + } + + public void testNumJobsEqToNumVms1() { + queue.clear(); + final int numProducers = 50; + Thread [] pThreads = new Thread[numProducers]; + + Producer [] producers = new Producer[numProducers]; + int numProduced = 0; + + for (int i=0; i < numProducers; i++) { + producers[i] = new Producer(i+1); + pThreads[i] = new Thread(producers[i]); + numProduced += i+1; + pThreads[i].start(); + } + for (int i=0; i < numProducers ; i++) { + try { + pThreads[i].join(); + } catch (InterruptedException ie){ + ie.printStackTrace(); + } + } + System.out.println("Num Vms= " + numProducers + " Queue size = " + queue.size()); + assert(numProducers == queue.size()); + } + + public void testNumJobsEqToNumVms2() { + queue.clear(); + + final int numProducers = 50; + Thread [] pThreads = new Thread[numProducers]; + + Producer [] producers = new Producer[numProducers]; + int numProduced = 0; + int maxVmId = 10000; + for (int i=0; i < numProducers; i++) { + producers[i] = new Producer(maxVmId); + pThreads[i] = new Thread(producers[i]); + numProduced += i+1; + pThreads[i].start(); + } + for (int i=0; i < numProducers ; i++) { + try { + pThreads[i].join(); + } catch (InterruptedException ie){ + ie.printStackTrace(); + } + } + System.out.println("Num Vms= " + maxVmId + " Queue size = " + queue.size()); + assert(maxVmId == queue.size()); + } + + public void testDequeueOneJob() { + queue.clear(); + + final int numProducers = 2; + final int numConsumers = 5; + final int maxVmId = 200; + + Thread [] pThreads = new Thread[numProducers]; + Thread [] cThreads = new Thread[numConsumers]; + + + Consumer [] consumers = new Consumer[numConsumers]; + Producer [] producers = new Producer[numProducers]; + + int numProduced = 0; + for (int i=0; i < numConsumers; i++) { + consumers[i] = new Consumer(1); + cThreads[i] = new Thread(consumers[i]); + cThreads[i].start(); + } + for (int i=0; i < numProducers; i++) { + producers[i] = new Producer(maxVmId); + pThreads[i] = new Thread(producers[i]); + numProduced += maxVmId; + pThreads[i].start(); + } + for (int i=0; i < numConsumers ; i++) { + try { + cThreads[i].join(); + } catch (InterruptedException ie){ + ie.printStackTrace(); + } + } +// try { +// Thread.sleep(2000); +// } catch (InterruptedException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } + int totalDequeued = 0; + for (int i=0; i < numConsumers; i++) { + //System.out.println("Consumer " + i + " ask to dequeue " + consumers[i].getNumJobsToDequeue() + ", dequeued " + consumers[i].getNumJobsDequeued()); + totalDequeued += consumers[i].getNumJobsDequeued(); + } + int totalQueued = 0; + for (int i=0; i < numProducers; i++) { + //System.out.println("Producer " + i + " ask to queue " + producers[i].getTotalWork() + ", queued " + producers[i].getNewWork()); + totalQueued += producers[i].getNewWork(); + } + System.out.println("Total jobs dequeued = " + totalDequeued + ", num queued=" + totalQueued + " queue current size=" + queue.size()); + assert(totalDequeued == numConsumers); + assert(totalQueued - totalDequeued == queue.size()); + } + +} diff --git a/server/test/com/cloud/utils/db/GlobalLockTest.java b/server/test/com/cloud/utils/db/GlobalLockTest.java new file mode 100644 index 00000000000..274b9015faf --- /dev/null +++ b/server/test/com/cloud/utils/db/GlobalLockTest.java @@ -0,0 +1,59 @@ +package com.cloud.utils.db; + +import org.apache.log4j.Logger; + +import junit.framework.Assert; + +import com.cloud.utils.Profiler; +import com.cloud.utils.testcase.Log4jEnabledTestCase; + +public class GlobalLockTest extends Log4jEnabledTestCase{ + public static final Logger s_logger = Logger.getLogger(GlobalLockTest.class); + private final static GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork"); + public static class Worker implements Runnable { + int id = 0; + int timeoutSeconds = 10; + int jobDuration = 2; + public Worker(int id, int timeout, int duration) { + this.id = id; + timeoutSeconds = timeout; + jobDuration = duration; + } + public void run() { + boolean locked = false; + try { + Profiler p = new Profiler(); + p.start(); + locked = _workLock.lock(timeoutSeconds); + p.stop(); + System.out.println("Thread " + id + " waited " + p.getDuration() + " ms, locked=" + locked); + if (locked) { + Thread.sleep(jobDuration*1000); + } + } catch (InterruptedException e) { + } finally { + if (locked) { + boolean unlocked = _workLock.unlock(); + System.out.println("Thread " + id + " unlocked=" + unlocked); + } + } + } + } + + public void testTimeout() { + Thread [] pool = new Thread[50]; + for (int i=0; i < pool.length; i++) { + pool[i] = new Thread(new Worker(i, 5, 3)); + } + for (int i=0; i < pool.length; i++) { + pool[i].start(); + } + for (int i=0; i < pool.length; i++) { + try { + pool[i].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +}