New security group mgr WIP : memory based queueing

This commit is contained in:
Chiradeep Vittal 2011-08-22 22:49:27 -07:00
parent 9c660c192f
commit abf4754503
13 changed files with 783 additions and 389 deletions

View File

@ -0,0 +1,39 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
/**
* Work related to security groups for a vm
*
*/
public interface SecurityGroupWork {
public enum Step {
Scheduled,
Processing,
Done,
Error
}
Long getInstanceId();
Long getLogsequenceNumber();
Step getStep();
void setStep(Step step);
public abstract void setLogsequenceNumber(Long logsequenceNumber);
}

View File

@ -34,13 +34,8 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="op_nwgrp_work")
public class SecurityGroupWorkVO {
public enum Step {
Scheduled,
Processing,
Done,
Error
}
public class SecurityGroupWorkVO implements SecurityGroupWork{
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@ -132,7 +127,8 @@ public class SecurityGroupWorkVO {
return logsequenceNumber;
}
public void setLogsequenceNumber(Long logsequenceNumber) {
@Override
public void setLogsequenceNumber(Long logsequenceNumber) {
this.logsequenceNumber = logsequenceNumber;
}

View File

@ -0,0 +1,185 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import com.cloud.network.security.SecurityGroupWork.Step;
/**
* Security Group Work Queue that is not shared with other management servers
*
*/
public class LocalSecurityGroupWorkQueue implements SecurityGroupWorkQueue {
protected static Logger s_logger = Logger.getLogger(LocalSecurityGroupWorkQueue.class);
protected TreeSet<SecurityGroupWork> _currentWork = new TreeSet<SecurityGroupWork>();
private final ReentrantLock _lock = new ReentrantLock();
private final Condition _notEmpty = _lock.newCondition();
private final AtomicInteger _count = new AtomicInteger(0);
public static class LocalSecurityGroupWork implements SecurityGroupWork, Comparable<LocalSecurityGroupWork> {
Long _logSequenceNumber;
Long _instanceId;
Step _step;
public LocalSecurityGroupWork(Long instanceId, Long logSequence, Step step){
this._instanceId = instanceId;
this._logSequenceNumber = logSequence;
this._step = step;
}
@Override
public Long getInstanceId() {
return _instanceId;
}
@Override
public Long getLogsequenceNumber() {
return _logSequenceNumber;
}
@Override
public Step getStep() {
return _step;
}
@Override
public void setStep(Step step) {
this._step = step;
}
@Override
public void setLogsequenceNumber(Long logsequenceNumber) {
this._logSequenceNumber = logsequenceNumber;
}
@Override
public int compareTo(LocalSecurityGroupWork o) {
return this._instanceId.compareTo(o.getInstanceId());
}
}
@Override
public void submitWorkForVm(long vmId, long sequenceNumber) {
_lock.lock();
try {
SecurityGroupWork work = new LocalSecurityGroupWork(vmId, sequenceNumber, Step.Scheduled);
boolean added = _currentWork.add(work);
if (added)
_count.incrementAndGet();
} finally {
_lock.unlock();
}
signalNotEmpty();
}
@Override
public int submitWorkForVms(Set<Long> vmIds) {
_lock.lock();
int newWork = _count.get();
try {
for (Long vmId: vmIds) {
SecurityGroupWork work = new LocalSecurityGroupWork(vmId, null, SecurityGroupWork.Step.Scheduled);
boolean added = _currentWork.add(work);
if (added)
_count.incrementAndGet();
}
} finally {
newWork = _count.get() - newWork;
_lock.unlock();
}
signalNotEmpty();
return newWork;
}
@Override
public List<SecurityGroupWork> getWork(int numberOfWorkItems) {
List<SecurityGroupWork> work = new ArrayList<SecurityGroupWork>(numberOfWorkItems);
_lock.lock();
int i = 0;
try {
while (_count.get() == 0) {
_notEmpty.await();
}
int n = Math.min(numberOfWorkItems, _count.get());
while (i < n ) {
SecurityGroupWork w = _currentWork.first();
w.setStep(Step.Processing);
work.add(w);
_currentWork.remove(w);
++i;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
int c = _count.addAndGet(-i);
if (c > 0)
_notEmpty.signal();
_lock.unlock();
}
return work;
}
private void signalNotEmpty() {
_lock.lock();
try {
_notEmpty.signal();
} finally {
_lock.unlock();
}
}
@Override
public int size() {
return _count.get();
}
@Override
public void clear() {
_lock.lock();
try {
_currentWork.clear();
_count.set(0);
} finally {
_lock.unlock();
}
}
}

View File

@ -37,7 +37,7 @@ import com.cloud.agent.manager.Commands;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.network.security.dao.SecurityGroupWorkDao;
/**

View File

@ -42,17 +42,13 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.NetworkRulesSystemVmCommand;
import com.cloud.agent.api.SecurityEgressRulesCmd;
import com.cloud.agent.api.SecurityEgressRulesCmd.EgressIpPortAndProto;
import com.cloud.agent.api.SecurityIngressRulesCmd;
import com.cloud.agent.api.SecurityIngressRulesCmd.IpPortAndProto;
import com.cloud.agent.manager.Commands;
import com.cloud.api.commands.AuthorizeSecurityGroupEgressCmd;
import com.cloud.api.commands.AuthorizeSecurityGroupIngressCmd;
import com.cloud.api.commands.CreateSecurityGroupCmd;
import com.cloud.api.commands.DeleteSecurityGroupCmd;
import com.cloud.api.commands.ListSecurityGroupsCmd;
import com.cloud.api.commands.RevokeSecurityGroupEgressCmd;
import com.cloud.api.commands.RevokeSecurityGroupIngressCmd;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
@ -68,8 +64,7 @@ import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceInUseException;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.NetworkManager;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.dao.EgressRuleDao;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.network.security.dao.IngressRuleDao;
import com.cloud.network.security.dao.SecurityGroupDao;
import com.cloud.network.security.dao.SecurityGroupRulesDao;
@ -79,6 +74,7 @@ import com.cloud.network.security.dao.VmRulesetLogDao;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountVO;
import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;
import com.cloud.uservm.UserVm;
@ -110,6 +106,8 @@ import com.cloud.vm.VirtualMachineManager;
import com.cloud.vm.dao.UserVmDao;
import com.cloud.vm.dao.VMInstanceDao;
import edu.emory.mathcs.backport.java.util.Collections;
@Local(value = { SecurityGroupManager.class, SecurityGroupService.class })
public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityGroupService, Manager, StateListener<State, VirtualMachine.Event, VirtualMachine> {
public static final Logger s_logger = Logger.getLogger(SecurityGroupManagerImpl.class);
@ -119,8 +117,6 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
@Inject
IngressRuleDao _ingressRuleDao;
@Inject
EgressRuleDao _egressRuleDao;
@Inject
SecurityGroupVMMapDao _securityGroupVMMapDao;
@Inject
SecurityGroupRulesDao _securityGroupRulesDao;
@ -323,41 +319,8 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
}
protected Map<PortAndProto, Set<String>> generateEgressRulesForVM(Long userVmId) {
Map<PortAndProto, Set<String>> allowed = new TreeMap<PortAndProto, Set<String>>();
List<SecurityGroupVMMapVO> groupsForVm = _securityGroupVMMapDao.listByInstanceId(userVmId);
for (SecurityGroupVMMapVO mapVO : groupsForVm) {
List<EgressRuleVO> rules = _egressRuleDao.listBySecurityGroupId(mapVO.getSecurityGroupId());
for (EgressRuleVO rule : rules) {
PortAndProto portAndProto = new PortAndProto(rule.getProtocol(), rule.getStartPort(), rule.getEndPort());
Set<String> cidrs = allowed.get(portAndProto);
if (cidrs == null) {
cidrs = new TreeSet<String>(new CidrComparator());
}
if (rule.getAllowedNetworkId() != null) {
List<SecurityGroupVMMapVO> allowedInstances = _securityGroupVMMapDao.listBySecurityGroup(rule.getAllowedNetworkId(), State.Running);
for (SecurityGroupVMMapVO ngmapVO : allowedInstances) {
Nic defaultNic = _networkMgr.getDefaultNic(ngmapVO.getInstanceId());
if (defaultNic != null) {
String cidr = defaultNic.getIp4Address();
cidr = cidr + "/32";
cidrs.add(cidr);
}
}
} else if (rule.getAllowedDestinationIpCidr() != null) {
cidrs.add(rule.getAllowedDestinationIpCidr());
}
if (cidrs.size() > 0) {
allowed.put(portAndProto, cidrs);
}
}
}
return allowed;
}
protected Map<PortAndProto, Set<String>> generateIngressRulesForVM(Long userVmId) {
protected Map<PortAndProto, Set<String>> generateRulesForVM(Long userVmId) {
Map<PortAndProto, Set<String>> allowed = new TreeMap<PortAndProto, Set<String>>();
@ -392,7 +355,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return allowed;
}
private String generateRulesetSignature(Map<PortAndProto, Set<String>> allowed) {
protected String generateRulesetSignature(Map<PortAndProto, Set<String>> allowed) {
String ruleset = allowed.toString();
return DigestUtils.md5Hex(ruleset);
}
@ -400,90 +363,79 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
protected void handleVmStarted(VMInstanceVO vm) {
if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId()))
return;
Set<Long> affectedVms = getAffectedVmsForVmStart(vm);
List<Long> affectedVms = getAffectedVmsForVmStart(vm);
scheduleRulesetUpdateToHosts(affectedVms, true, null);
}
@DB
public void scheduleRulesetUpdateToHosts(Set<Long> affectedVms, boolean updateSeqno, Long delayMs) {
public void scheduleRulesetUpdateToHosts(List<Long> affectedVms, boolean updateSeqno, Long delayMs) {
if (affectedVms.size() == 0) {
return;
}
if (delayMs == null) {
delayMs = new Long(100l);
}
Collections.sort(affectedVms);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms");
}
if (affectedVms.size() == 0) {
boolean locked = _workLock.lock(_globalWorkLockTimeout);
if (!locked) {
s_logger.warn("Security Group Mgr: failed to acquire global work lock");
return;
}
boolean locked = _workLock.lock(_globalWorkLockTimeout);
if (locked) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: acquired global work lock");
}
try {
for (Long vmId : affectedVms) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId);
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
//UserVm vm = null;
Transaction txn = null;
try {
txn = Transaction.currentTxn();
txn.start();
//vm = _userVMDao.acquireInLockTable(vmId);
//if (vm == null) {
//s_logger.warn("Failed to acquire lock on vm id " + vmId);
//continue;
//}
log = _rulesetLogDao.findByVmId(vmId);
if (log == null) {
log = new VmRulesetLogVO(vmId);
log = _rulesetLogDao.persist(log);
}
if (log != null && updateSeqno) {
log.incrLogsequence();
_rulesetLogDao.update(log.getId(), log);
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId);
}
}
work.setLogsequenceNumber(log.getLogsequence());
_workDao.update(work.getId(), work);
} finally {
// if (vm != null) {
// _userVMDao.releaseFromLockTable(vmId);
// }
if (txn != null)
txn.commit();
}
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
}
} finally {
_workLock.unlock();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: acquired global work lock");
}
Transaction txn = Transaction.currentTxn();
try {
txn.start();
for (Long vmId : affectedVms) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: released global work lock");
s_logger.trace("Security Group Mgr: scheduling ruleset update for " + vmId);
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
log = _rulesetLogDao.findByVmId(vmId);
if (log == null) {
log = new VmRulesetLogVO(vmId);
log = _rulesetLogDao.persist(log);
}
if (log != null && updateSeqno) {
log.incrLogsequence();
_rulesetLogDao.update(log.getId(), log);
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWork.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId + "; id = " + work.getId());
}
}
work.setLogsequenceNumber(log.getLogsequence());
_workDao.update(work.getId(), work);
}
txn.commit();
for (Long vmId : affectedVms) {
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
}
} finally {
_workLock.unlock();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: released global work lock");
}
} else {
s_logger.warn("Security Group Mgr: failed to acquire global work lock");
}
}
protected Set<Long> getAffectedVmsForVmStart(VMInstanceVO vm) {
Set<Long> affectedVms = new HashSet<Long>();
protected List<Long> getAffectedVmsForVmStart(VMInstanceVO vm) {
List<Long> affectedVms = new ArrayList<Long>();
affectedVms.add(vm.getId());
List<SecurityGroupVMMapVO> groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId());
// For each group, find the ingress rules that allow the group
@ -495,8 +447,8 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return affectedVms;
}
protected Set<Long> getAffectedVmsForVmStop(VMInstanceVO vm) {
Set<Long> affectedVms = new HashSet<Long>();
protected List<Long> getAffectedVmsForVmStop(VMInstanceVO vm) {
List<Long> affectedVms = new ArrayList<Long>();
List<SecurityGroupVMMapVO> groupsForVm = _securityGroupVMMapDao.listByInstanceId(vm.getId());
// For each group, find the ingress rules that allow the group
for (SecurityGroupVMMapVO mapVO : groupsForVm) {// FIXME: use custom sql in the dao
@ -507,9 +459,9 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return affectedVms;
}
protected Set<Long> getAffectedVmsForIngressRules(List<IngressRuleVO> allowingRules) {
protected List<Long> getAffectedVmsForIngressRules(List<IngressRuleVO> allowingRules) {
Set<Long> distinctGroups = new HashSet<Long>();
Set<Long> affectedVms = new HashSet<Long>();
List<Long> affectedVms = new ArrayList<Long>();
for (IngressRuleVO allowingRule : allowingRules) {
distinctGroups.add(allowingRule.getSecurityGroupId());
@ -521,7 +473,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return affectedVms;
}
protected SecurityIngressRulesCmd generateIngressRulesetCmd(String vmName, String guestIp, String guestMac, Long vmId, String signature, long seqnum, Map<PortAndProto, Set<String>> rules) {
protected SecurityIngressRulesCmd generateRulesetCmd(String vmName, String guestIp, String guestMac, Long vmId, String signature, long seqnum, Map<PortAndProto, Set<String>> rules) {
List<IpPortAndProto> result = new ArrayList<IpPortAndProto>();
for (PortAndProto pAp : rules.keySet()) {
Set<String> cidrs = rules.get(pAp);
@ -532,23 +484,11 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
return new SecurityIngressRulesCmd(guestIp, guestMac, vmName, vmId, signature, seqnum, result.toArray(new IpPortAndProto[result.size()]));
}
protected SecurityEgressRulesCmd generateEgressRulesetCmd(String vmName, String guestIp, String guestMac, Long vmId, String signature, long seqnum, Map<PortAndProto, Set<String>> rules) {
List<EgressIpPortAndProto> result = new ArrayList<EgressIpPortAndProto>();
for (PortAndProto pAp : rules.keySet()) {
Set<String> cidrs = rules.get(pAp);
if (cidrs.size() > 0) {
EgressIpPortAndProto ipPortAndProto = new SecurityEgressRulesCmd.EgressIpPortAndProto(pAp.getProto(), pAp.getStartPort(), pAp.getEndPort(), cidrs.toArray(new String[cidrs.size()]));
result.add(ipPortAndProto);
}
}
return new SecurityEgressRulesCmd(guestIp, guestMac, vmName, vmId, signature, seqnum, result.toArray(new EgressIpPortAndProto[result.size()]));
}
protected void handleVmStopped(VMInstanceVO vm) {
if (vm.getType() != VirtualMachine.Type.User || !isVmSecurityGroupEnabled(vm.getId()))
return;
Set<Long> affectedVms = getAffectedVmsForVmStop(vm);
List<Long> affectedVms = getAffectedVmsForVmStop(vm);
scheduleRulesetUpdateToHosts(affectedVms, true, null);
}
@ -568,7 +508,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
} else {
Set<Long> affectedVms = new HashSet<Long>();
List<Long> affectedVms = new ArrayList<Long>();
affectedVms.add(vm.getId());
scheduleRulesetUpdateToHosts(affectedVms, true, null);
}
@ -738,7 +678,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
s_logger.debug("Added " + newRules.size() + " rules to security group " + securityGroup.getName());
}
txn.commit();
final Set<Long> affectedVms = new HashSet<Long>();
final ArrayList<Long> affectedVms = new ArrayList<Long>();
affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(securityGroup.getId()));
scheduleRulesetUpdateToHosts(affectedVms, true, null);
return newRules;
@ -784,7 +724,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
_ingressRuleDao.remove(id);
s_logger.debug("revokeSecurityGroupIngress succeeded for ingress rule id: " + id);
final Set<Long> affectedVms = new HashSet<Long>();
final ArrayList<Long> affectedVms = new ArrayList<Long>();
affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(groupHandle.getId()));
scheduleRulesetUpdateToHosts(affectedVms, true, null);
@ -800,232 +740,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
}
@Override
@DB
@SuppressWarnings("rawtypes")
public List<EgressRuleVO> authorizeSecurityGroupEgress(AuthorizeSecurityGroupEgressCmd cmd) {
Long securityGroupId = cmd.getSecurityGroupId();
String protocol = cmd.getProtocol();
Integer startPort = cmd.getStartPort();
Integer endPort = cmd.getEndPort();
Integer icmpType = cmd.getIcmpType();
Integer icmpCode = cmd.getIcmpCode();
List<String> cidrList = cmd.getCidrList();
Map groupList = cmd.getUserSecurityGroupList();
Integer startPortOrType = null;
Integer endPortOrCode = null;
// Validate parameters
SecurityGroup securityGroup = _securityGroupDao.findById(securityGroupId);
if (securityGroup == null) {
throw new InvalidParameterValueException("Unable to find security group by id " + securityGroupId);
}
if (cidrList == null && groupList == null) {
throw new InvalidParameterValueException("At least one cidr or at least one security group needs to be specified");
}
Account caller = UserContext.current().getCaller();
Account owner = _accountMgr.getAccount(securityGroup.getAccountId());
if (owner == null) {
throw new InvalidParameterValueException("Unable to find security group owner by id=" + securityGroup.getAccountId());
}
// Verify permissions
_accountMgr.checkAccess(caller, null, securityGroup);
Long domainId = owner.getDomainId();
if (protocol == null) {
protocol = NetUtils.ALL_PROTO;
}
if (!NetUtils.isValidSecurityGroupProto(protocol)) {
throw new InvalidParameterValueException("Invalid protocol " + protocol);
}
if ("icmp".equalsIgnoreCase(protocol)) {
if ((icmpType == null) || (icmpCode == null)) {
throw new InvalidParameterValueException("Invalid ICMP type/code specified, icmpType = " + icmpType + ", icmpCode = " + icmpCode);
}
if (icmpType == -1 && icmpCode != -1) {
throw new InvalidParameterValueException("Invalid icmp type range");
}
if (icmpCode > 255) {
throw new InvalidParameterValueException("Invalid icmp code ");
}
startPortOrType = icmpType;
endPortOrCode = icmpCode;
} else if (protocol.equals(NetUtils.ALL_PROTO)) {
if ((startPort != null) || (endPort != null)) {
throw new InvalidParameterValueException("Cannot specify startPort or endPort without specifying protocol");
}
startPortOrType = 0;
endPortOrCode = 0;
} else {
if ((startPort == null) || (endPort == null)) {
throw new InvalidParameterValueException("Invalid port range specified, startPort = " + startPort + ", endPort = " + endPort);
}
if (startPort == 0 && endPort == 0) {
endPort = 65535;
}
if (startPort > endPort) {
throw new InvalidParameterValueException("Invalid port range " + startPort + ":" + endPort);
}
if (startPort > 65535 || endPort > 65535 || startPort < -1 || endPort < -1) {
throw new InvalidParameterValueException("Invalid port numbers " + startPort + ":" + endPort);
}
if (startPort < 0 || endPort < 0) {
throw new InvalidParameterValueException("Invalid port range " + startPort + ":" + endPort);
}
startPortOrType = startPort;
endPortOrCode = endPort;
}
protocol = protocol.toLowerCase();
List<SecurityGroupVO> authorizedGroups = new ArrayList<SecurityGroupVO>();
if (groupList != null) {
Collection userGroupCollection = groupList.values();
Iterator iter = userGroupCollection.iterator();
while (iter.hasNext()) {
HashMap userGroup = (HashMap) iter.next();
String group = (String) userGroup.get("group");
String authorizedAccountName = (String) userGroup.get("account");
if ((group == null) || (authorizedAccountName == null)) {
throw new InvalidParameterValueException(
"Invalid user group specified, fields 'group' and 'account' cannot be null, please specify groups in the form: userGroupList[0].group=XXX&userGroupList[0].account=YYY");
}
Account authorizedAccount = _accountDao.findActiveAccount(authorizedAccountName, domainId);
if (authorizedAccount == null) {
throw new InvalidParameterValueException("Nonexistent account: " + authorizedAccountName + " when trying to authorize ingress for " + securityGroupId + ":" + protocol + ":"
+ startPortOrType + ":" + endPortOrCode);
}
SecurityGroupVO groupVO = _securityGroupDao.findByAccountAndName(authorizedAccount.getId(), group);
if (groupVO == null) {
throw new InvalidParameterValueException("Nonexistent group " + group + " for account " + authorizedAccountName + "/" + domainId + " is given, unable to authorize ingress.");
}
// Check permissions
_accountMgr.checkAccess(caller, null, groupVO);
authorizedGroups.add(groupVO);
}
}
final Transaction txn = Transaction.currentTxn();
final Set<SecurityGroupVO> authorizedGroups2 = new TreeSet<SecurityGroupVO>(new SecurityGroupVOComparator());
authorizedGroups2.addAll(authorizedGroups); // Ensure we don't re-lock the same row
txn.start();
// Prevents other threads/management servers from creating duplicate ingress rules
securityGroup = _securityGroupDao.acquireInLockTable(securityGroupId);
if (securityGroup == null) {
s_logger.warn("Could not acquire lock on network security group: id= " + securityGroupId);
return null;
}
List<EgressRuleVO> newRules = new ArrayList<EgressRuleVO>();
try {
for (final SecurityGroupVO ngVO : authorizedGroups2) {
final Long ngId = ngVO.getId();
// Don't delete the referenced group from under us
if (ngVO.getId() != securityGroup.getId()) {
final SecurityGroupVO tmpGrp = _securityGroupDao.lockRow(ngId, false);
if (tmpGrp == null) {
s_logger.warn("Failed to acquire lock on security group: " + ngId);
txn.rollback();
return null;
}
}
EgressRuleVO egressRule = _egressRuleDao.findByProtoPortsAndAllowedGroupId(securityGroup.getId(), protocol, startPortOrType, endPortOrCode, ngVO.getId());
if (egressRule != null) {
continue; // rule already exists.
}
egressRule = new EgressRuleVO(securityGroup.getId(), startPortOrType, endPortOrCode, protocol, ngVO.getId());
egressRule = _egressRuleDao.persist(egressRule);
newRules.add(egressRule);
}
if (cidrList != null) {
for (String cidr : cidrList) {
EgressRuleVO egressRule = _egressRuleDao.findByProtoPortsAndCidr(securityGroup.getId(), protocol, startPortOrType, endPortOrCode, cidr);
if (egressRule != null) {
continue;
}
egressRule = new EgressRuleVO(securityGroup.getId(), startPortOrType, endPortOrCode, protocol, cidr);
egressRule = _egressRuleDao.persist(egressRule);
newRules.add(egressRule);
}
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Added " + newRules.size() + " rules to security group " + securityGroup.getName());
}
txn.commit();
final Set<Long> affectedVms = new HashSet<Long>();
affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(securityGroup.getId()));
scheduleRulesetUpdateToHosts(affectedVms, true, null);
return newRules;
} catch (Exception e) {
s_logger.warn("Exception caught when adding ingress rules ", e);
throw new CloudRuntimeException("Exception caught when adding ingress rules", e);
} finally {
if (securityGroup != null) {
_securityGroupDao.releaseFromLockTable(securityGroup.getId());
}
}
}
@Override
@DB
public boolean revokeSecurityGroupEgress(RevokeSecurityGroupEgressCmd cmd) {
// input validation
Account caller = UserContext.current().getCaller();
Long id = cmd.getId();
IngressRuleVO rule = _ingressRuleDao.findById(id);
if (rule == null) {
s_logger.debug("Unable to find ingress rule with id " + id);
throw new InvalidParameterValueException("Unable to find ingress rule with id " + id);
}
// Check permissions
SecurityGroup securityGroup = _securityGroupDao.findById(rule.getSecurityGroupId());
_accountMgr.checkAccess(caller, null, securityGroup);
SecurityGroupVO groupHandle = null;
final Transaction txn = Transaction.currentTxn();
try {
txn.start();
// acquire lock on parent group (preserving this logic)
groupHandle = _securityGroupDao.acquireInLockTable(rule.getSecurityGroupId());
if (groupHandle == null) {
s_logger.warn("Could not acquire lock on security group id: " + rule.getSecurityGroupId());
return false;
}
_ingressRuleDao.remove(id);
s_logger.debug("revokeSecurityGroupIngress succeeded for ingress rule id: " + id);
final Set<Long> affectedVms = new HashSet<Long>();
affectedVms.addAll(_securityGroupVMMapDao.listVmIdsBySecurityGroup(groupHandle.getId()));
scheduleRulesetUpdateToHosts(affectedVms, true, null);
return true;
} catch (Exception e) {
s_logger.warn("Exception caught when deleting ingress rules ", e);
throw new CloudRuntimeException("Exception caught when deleting ingress rules", e);
} finally {
if (groupHandle != null) {
_securityGroupDao.releaseFromLockTable(groupHandle.getId());
}
txn.commit();
}
}
@Override
@ActionEvent(eventType = EventTypes.EVENT_SECURITY_GROUP_CREATE, eventDescription = "creating security group")
public SecurityGroupVO createSecurityGroup(CreateSecurityGroupCmd cmd) throws PermissionDeniedException, InvalidParameterValueException {
@ -1121,9 +836,10 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
if (s_logger.isDebugEnabled()) {
s_logger.debug("Security Group work: found a job in done state, rescheduling for vm: " + userVmId);
}
Set<Long> affectedVms = new HashSet<Long>();
ArrayList<Long> affectedVms = new ArrayList<Long>();
affectedVms.add(userVmId);
scheduleRulesetUpdateToHosts(affectedVms, true, _timeBetweenCleanups*1000l);
return;
}
UserVm vm = null;
Long seqnum = null;
@ -1153,23 +869,11 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
seqnum = log.getLogsequence();
if (vm != null && vm.getState() == State.Running) {
Map<PortAndProto, Set<String>> ingressRules = generateIngressRulesForVM(userVmId);
Map<PortAndProto, Set<String>> egressRules = generateEgressRulesForVM(userVmId);
Map<PortAndProto, Set<String>> rules = generateRulesForVM(userVmId);
agentId = vm.getHostId();
if (agentId != null) {
_rulesetLogDao.findByVmId(work.getInstanceId());
SecurityIngressRulesCmd ingressCmd = generateIngressRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(ingressRules), seqnum,
ingressRules);
Commands ingressCmds = new Commands(ingressCmd);
try {
_agentMgr.send(agentId, ingressCmds, _answerListener);
} catch (AgentUnavailableException e) {
s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")");
_workDao.updateStep(work.getInstanceId(), seqnum, Step.Done);
}
SecurityEgressRulesCmd cmd = generateEgressRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(egressRules), seqnum,
egressRules);
SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(rules), seqnum,
rules);
Commands cmds = new Commands(cmd);
try {
_agentMgr.send(agentId, cmds, _answerListener);
@ -1407,7 +1111,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
@Override
public void fullSync(long agentId, HashMap<String, Pair<Long, Long>> newGroupStates) {
Set<Long> affectedVms = new HashSet<Long>();
ArrayList<Long> affectedVms = new ArrayList<Long>();
for (String vmName : newGroupStates.keySet()) {
Long vmId = newGroupStates.get(vmName).first();
Long seqno = newGroupStates.get(vmName).second();
@ -1438,7 +1142,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
List<SecurityGroupWorkVO> unfinished = _workDao.findUnfinishedWork(before);
if (unfinished.size() > 0) {
s_logger.info("Network Group Work cleanup found " + unfinished.size() + " unfinished work items older than " + before.toString());
Set<Long> affectedVms = new HashSet<Long>();
ArrayList<Long> affectedVms = new ArrayList<Long>();
for (SecurityGroupWorkVO work : unfinished) {
affectedVms.add(work.getInstanceId());
work.setStep(Step.Error);

View File

@ -0,0 +1,157 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.naming.ConfigurationException;
import com.cloud.agent.api.SecurityIngressRulesCmd;
import com.cloud.agent.manager.Commands;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.uservm.UserVm;
import com.cloud.utils.db.Transaction;
import com.cloud.vm.VirtualMachine.State;
/**
* Same as the base class -- except it uses the abstracted security group work queue
*
*/
public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue();
WorkerThread[] _workers;
long _timeToSleep = 10000;
protected class WorkerThread extends Thread {
public WorkerThread(String name) {
super(name);
}
@Override
public void run() {
work();
}
}
@Override
public void scheduleRulesetUpdateToHosts(List<Long> affectedVms, boolean updateSeqno, Long delayMs) {
if (affectedVms.size() == 0) {
return;
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr2: scheduling ruleset updates for " + affectedVms.size() + " vms");
}
Set<Long> workItems = new TreeSet<Long>();
workItems.addAll(affectedVms);
Transaction txn = Transaction.currentTxn();
txn.start();
_rulesetLogDao.createOrUpdate(workItems);
txn.commit();
_workQueue.submitWorkForVms(workItems);
notifyAll();
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
// TODO Auto-generated method stub
return super.configure(name, params);
}
@Override
public boolean start() {
// TODO Auto-generated method stub
return super.start();
}
@Override
public void work() {
while (true) {
try {
s_logger.trace("Checking the work queue");
List<SecurityGroupWork> workItems = _workQueue.getWork(1);
for (SecurityGroupWork work: workItems) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Processing " + work.getInstanceId());
}
try {
VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId());
if (rulesetLog == null) {
s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId());
continue;
}
work.setLogsequenceNumber(rulesetLog.getLogsequence());
sendRulesetUpdates(work);
}catch (Exception e) {
s_logger.error("Problem during SG work " + work, e);
work.setStep(Step.Error);
}
}
} catch (final Throwable th) {
s_logger.error("Caught this throwable, ", th);
}
}
}
protected void sendRulesetUpdates(SecurityGroupWork work){
Long userVmId = work.getInstanceId();
UserVm vm = _userVMDao.findById(userVmId);
if (vm != null && vm.getState() == State.Running) {
Map<PortAndProto, Set<String>> rules = generateRulesForVM(userVmId);
Long agentId = vm.getHostId();
if (agentId != null) {
SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(),
vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(rules),
work.getLogsequenceNumber(), rules);
Commands cmds = new Commands(cmd);
try {
_agentMgr.send(agentId, cmds, _answerListener);
} catch (AgentUnavailableException e) {
s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")");
}
}
} else {
if (s_logger.isTraceEnabled()) {
if (vm != null)
s_logger.trace("No rules sent to vm " + vm + "state=" + vm.getState());
else
s_logger.trace("Could not find vm: No rules sent to vm " + userVmId );
}
}
}
@Override
public void cleanupFinishedWork() {
//no-op
}
}

View File

@ -0,0 +1,40 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
import java.util.List;
import java.util.Set;
/**
* Security Group Work queue
* standard producer / consumer interface
*
*/
public interface SecurityGroupWorkQueue {
void submitWorkForVm(long vmId, long sequenceNumber);
int submitWorkForVms(Set<Long> vmIds);
List<SecurityGroupWork> getWork(int numberOfWorkItems);
int size();
void clear();
}

View File

@ -21,12 +21,13 @@ package com.cloud.network.security.dao;
import java.util.Date;
import java.util.List;
import com.cloud.network.security.SecurityGroupWork;
import com.cloud.network.security.SecurityGroupWorkVO;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.utils.db.GenericDao;
public interface SecurityGroupWorkDao extends GenericDao<SecurityGroupWorkVO, Long> {
SecurityGroupWorkVO findByVmId(long vmId, boolean taken);
SecurityGroupWork findByVmId(long vmId, boolean taken);
SecurityGroupWorkVO findByVmIdStep(long vmId, Step step);

View File

@ -26,8 +26,9 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.ha.HaWorkVO;
import com.cloud.network.security.SecurityGroupWork;
import com.cloud.network.security.SecurityGroupWorkVO;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
@ -91,7 +92,7 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
}
@Override
public SecurityGroupWorkVO findByVmId(long vmId, boolean taken) {
public SecurityGroupWork findByVmId(long vmId, boolean taken) {
SearchCriteria<SecurityGroupWorkVO> sc = taken?VmIdTakenSearch.create():VmIdUnTakenSearch.create();
sc.setParameters("vmId", vmId);
return findOneIncludingRemovedBy(sc);
@ -130,9 +131,9 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
if (processing) {
//the caller to take() should check the step and schedule another work item to come back
//and take a look.
work.setStep(SecurityGroupWorkVO.Step.Done);
work.setStep(SecurityGroupWork.Step.Done);
} else {
work.setStep(SecurityGroupWorkVO.Step.Processing);
work.setStep(SecurityGroupWork.Step.Processing);
}
update(work.getId(), work);

View File

@ -18,10 +18,14 @@
package com.cloud.network.security.dao;
import java.util.Set;
import com.cloud.network.security.VmRulesetLogVO;
import com.cloud.utils.db.GenericDao;
public interface VmRulesetLogDao extends GenericDao<VmRulesetLogVO, Long> {
VmRulesetLogVO findByVmId(long vmId);
void createOrUpdate(Set<Long> workItems);
}

View File

@ -18,17 +18,27 @@
package com.cloud.network.security.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;
import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.network.security.VmRulesetLogVO;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
@Local(value={VmRulesetLogDao.class})
public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> implements VmRulesetLogDao {
public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> implements VmRulesetLogDao {
protected static Logger s_logger = Logger.getLogger(VmRulesetLogDaoImpl.class);
private SearchBuilder<VmRulesetLogVO> VmIdSearch;
private String INSERT_OR_UPDATE = "INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) " +
" VALUES(?, now(), 1) ON DUPLICATE KEY UPDATE logsequence=logsequence+1";
protected VmRulesetLogDaoImpl() {
VmIdSearch = createSearchBuilder();
@ -43,6 +53,39 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> im
SearchCriteria<VmRulesetLogVO> sc = VmIdSearch.create();
sc.setParameters("vmId", vmId);
return findOneIncludingRemovedBy(sc);
}
@Override
public void createOrUpdate(Set<Long> workItems) {
Transaction txn = Transaction.currentTxn();
PreparedStatement stmtInsert = null;
int [] queryResult = null;
boolean success = true;
try {
stmtInsert = txn.prepareAutoCloseStatement(INSERT_OR_UPDATE);
for (Long vmId: workItems) {
stmtInsert.setLong(1, vmId);
stmtInsert.addBatch();
}
queryResult = stmtInsert.executeBatch();
txn.commit();
if (s_logger.isTraceEnabled())
s_logger.trace("Updated or inserted " + workItems.size() + " log items");
} catch (SQLException e) {
s_logger.warn("Failed to execute batch update statement for ruleset log");
success = false;
}
if (!success && queryResult != null) {
Long [] arrayItems = new Long[workItems.size()];
workItems.toArray(arrayItems);
for (int i=0; i < queryResult.length; i++) {
if (queryResult[i] < 0 ) {
s_logger.debug("Batch query update failed for vm " + arrayItems[i]);
}
}
}
return;
}

View File

@ -0,0 +1,165 @@
package com.cloud.network.security;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import junit.framework.TestCase;
public class SecurityGroupQueueTest extends TestCase {
public final static SecurityGroupWorkQueue queue = new LocalSecurityGroupWorkQueue();
public static class Producer implements Runnable {
int _maxVmId = 0;
int _newWorkQueued=0;
Set<Long> vmIds = new HashSet<Long>();
public Producer(int maxVmId) {
this._maxVmId = maxVmId;
for (long i=1; i <= _maxVmId; i++) {
vmIds.add(i);
}
}
public void run() {
_newWorkQueued = queue.submitWorkForVms(vmIds);
}
public int getNewWork() {
return _newWorkQueued;
}
public int getTotalWork() {
return _maxVmId;
}
}
public static class Consumer implements Runnable {
private int _numJobsToDequeue = 0;
private int _numJobsDequeued = 0;
public Consumer(int numJobsToDequeu) {
this._numJobsToDequeue = numJobsToDequeu;
}
public void run() {
List<SecurityGroupWork> result = queue.getWork(_numJobsToDequeue);
this._numJobsDequeued = result.size();
}
int getNumJobsToDequeue() {
return _numJobsToDequeue;
}
int getNumJobsDequeued() {
return _numJobsDequeued;
}
}
public void testNumJobsEqToNumVms1() {
queue.clear();
final int numProducers = 50;
Thread [] pThreads = new Thread[numProducers];
Producer [] producers = new Producer[numProducers];
int numProduced = 0;
for (int i=0; i < numProducers; i++) {
producers[i] = new Producer(i+1);
pThreads[i] = new Thread(producers[i]);
numProduced += i+1;
pThreads[i].start();
}
for (int i=0; i < numProducers ; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie){
ie.printStackTrace();
}
}
System.out.println("Num Vms= " + numProducers + " Queue size = " + queue.size());
assert(numProducers == queue.size());
}
public void testNumJobsEqToNumVms2() {
queue.clear();
final int numProducers = 50;
Thread [] pThreads = new Thread[numProducers];
Producer [] producers = new Producer[numProducers];
int numProduced = 0;
int maxVmId = 10000;
for (int i=0; i < numProducers; i++) {
producers[i] = new Producer(maxVmId);
pThreads[i] = new Thread(producers[i]);
numProduced += i+1;
pThreads[i].start();
}
for (int i=0; i < numProducers ; i++) {
try {
pThreads[i].join();
} catch (InterruptedException ie){
ie.printStackTrace();
}
}
System.out.println("Num Vms= " + maxVmId + " Queue size = " + queue.size());
assert(maxVmId == queue.size());
}
public void testDequeueOneJob() {
queue.clear();
final int numProducers = 2;
final int numConsumers = 5;
final int maxVmId = 200;
Thread [] pThreads = new Thread[numProducers];
Thread [] cThreads = new Thread[numConsumers];
Consumer [] consumers = new Consumer[numConsumers];
Producer [] producers = new Producer[numProducers];
int numProduced = 0;
for (int i=0; i < numConsumers; i++) {
consumers[i] = new Consumer(1);
cThreads[i] = new Thread(consumers[i]);
cThreads[i].start();
}
for (int i=0; i < numProducers; i++) {
producers[i] = new Producer(maxVmId);
pThreads[i] = new Thread(producers[i]);
numProduced += maxVmId;
pThreads[i].start();
}
for (int i=0; i < numConsumers ; i++) {
try {
cThreads[i].join();
} catch (InterruptedException ie){
ie.printStackTrace();
}
}
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
int totalDequeued = 0;
for (int i=0; i < numConsumers; i++) {
//System.out.println("Consumer " + i + " ask to dequeue " + consumers[i].getNumJobsToDequeue() + ", dequeued " + consumers[i].getNumJobsDequeued());
totalDequeued += consumers[i].getNumJobsDequeued();
}
int totalQueued = 0;
for (int i=0; i < numProducers; i++) {
//System.out.println("Producer " + i + " ask to queue " + producers[i].getTotalWork() + ", queued " + producers[i].getNewWork());
totalQueued += producers[i].getNewWork();
}
System.out.println("Total jobs dequeued = " + totalDequeued + ", num queued=" + totalQueued + " queue current size=" + queue.size());
assert(totalDequeued == numConsumers);
assert(totalQueued - totalDequeued == queue.size());
}
}

View File

@ -0,0 +1,59 @@
package com.cloud.utils.db;
import org.apache.log4j.Logger;
import junit.framework.Assert;
import com.cloud.utils.Profiler;
import com.cloud.utils.testcase.Log4jEnabledTestCase;
public class GlobalLockTest extends Log4jEnabledTestCase{
public static final Logger s_logger = Logger.getLogger(GlobalLockTest.class);
private final static GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork");
public static class Worker implements Runnable {
int id = 0;
int timeoutSeconds = 10;
int jobDuration = 2;
public Worker(int id, int timeout, int duration) {
this.id = id;
timeoutSeconds = timeout;
jobDuration = duration;
}
public void run() {
boolean locked = false;
try {
Profiler p = new Profiler();
p.start();
locked = _workLock.lock(timeoutSeconds);
p.stop();
System.out.println("Thread " + id + " waited " + p.getDuration() + " ms, locked=" + locked);
if (locked) {
Thread.sleep(jobDuration*1000);
}
} catch (InterruptedException e) {
} finally {
if (locked) {
boolean unlocked = _workLock.unlock();
System.out.println("Thread " + id + " unlocked=" + unlocked);
}
}
}
}
public void testTimeout() {
Thread [] pool = new Thread[50];
for (int i=0; i < pool.length; i++) {
pool[i] = new Thread(new Worker(i, 5, 3));
}
for (int i=0; i < pool.length; i++) {
pool[i].start();
}
for (int i=0; i < pool.length; i++) {
try {
pool[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}