WIP : memory based queueing

This commit is contained in:
Chiradeep Vittal 2011-08-22 22:49:27 -07:00
parent fc274739f0
commit 6465ccff1b
11 changed files with 371 additions and 20 deletions

View File

@ -0,0 +1,39 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
/**
* Work related to security groups for a vm
*
*/
public interface SecurityGroupWork {
public enum Step {
Scheduled,
Processing,
Done,
Error
}
Long getInstanceId();
Long getLogsequenceNumber();
Step getStep();
void setStep(Step step);
public abstract void setLogsequenceNumber(Long logsequenceNumber);
}

View File

@ -34,13 +34,8 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="op_nwgrp_work")
public class SecurityGroupWorkVO {
public enum Step {
Scheduled,
Processing,
Done,
Error
}
public class SecurityGroupWorkVO implements SecurityGroupWork{
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@ -132,7 +127,8 @@ public class SecurityGroupWorkVO {
return logsequenceNumber;
}
public void setLogsequenceNumber(Long logsequenceNumber) {
@Override
public void setLogsequenceNumber(Long logsequenceNumber) {
this.logsequenceNumber = logsequenceNumber;
}

View File

@ -0,0 +1,74 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import com.cloud.network.security.SecurityGroupWork.Step;
/**
* Security Group Work Queue that is not shared with other management servers
*
*/
public class LocalSecurityGroupWorkQueue implements SecurityGroupWorkQueue {
protected static Logger s_logger = Logger.getLogger(LocalSecurityGroupWorkQueue.class);
protected LinkedBlockingQueue<SecurityGroupWork> _queue = new LinkedBlockingQueue<SecurityGroupWork>();
@Override
public void submitWorkForVm(long vmId, long sequenceNumber) {
SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null);
boolean result = _queue.offer(work);
if (!result) {
s_logger.warn("Failed to add work item into queue for vm id " + vmId);
}
}
@Override
public void submitWorkForVms(Set<Long> vmIds) {
for (Long vmId: vmIds) {
SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null);
boolean result = _queue.offer(work);
if (!result) {
s_logger.warn("Failed to add work item into queue for vm id " + vmId);
}
}
}
@Override
public List<SecurityGroupWork> getWork(int numberOfWorkItems) {
List<SecurityGroupWork> work = new ArrayList<SecurityGroupWork>();
_queue.drainTo(work, numberOfWorkItems);
for (SecurityGroupWork w: work) {
w.setStep(Step.Processing);
}
return work;
}
}

View File

@ -37,7 +37,7 @@ import com.cloud.agent.manager.Commands;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.network.security.dao.SecurityGroupWorkDao;
/**

View File

@ -64,7 +64,7 @@ import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceInUseException;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.NetworkManager;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.network.security.dao.IngressRuleDao;
import com.cloud.network.security.dao.SecurityGroupDao;
import com.cloud.network.security.dao.SecurityGroupRulesDao;
@ -355,7 +355,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return allowed;
}
private String generateRulesetSignature(Map<PortAndProto, Set<String>> allowed) {
protected String generateRulesetSignature(Map<PortAndProto, Set<String>> allowed) {
String ruleset = allowed.toString();
return DigestUtils.md5Hex(ruleset);
}
@ -412,7 +412,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWork.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId + "; id = " + work.getId());

View File

@ -0,0 +1,157 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.naming.ConfigurationException;
import com.cloud.agent.api.SecurityIngressRulesCmd;
import com.cloud.agent.manager.Commands;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.uservm.UserVm;
import com.cloud.utils.db.Transaction;
import com.cloud.vm.VirtualMachine.State;
/**
* Same as the base class -- except it uses the abstracted security group work queue
*
*/
public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue();
WorkerThread[] _workers;
long _timeToSleep = 10000;
protected class WorkerThread extends Thread {
public WorkerThread(String name) {
super(name);
}
@Override
public void run() {
work();
}
}
@Override
public void scheduleRulesetUpdateToHosts(List<Long> affectedVms, boolean updateSeqno, Long delayMs) {
if (affectedVms.size() == 0) {
return;
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr2: scheduling ruleset updates for " + affectedVms.size() + " vms");
}
Set<Long> workItems = new TreeSet<Long>();
workItems.addAll(affectedVms);
Transaction txn = Transaction.currentTxn();
txn.start();
_rulesetLogDao.createOrUpdate(workItems);
txn.commit();
_workQueue.submitWorkForVms(workItems);
notifyAll();
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
// TODO Auto-generated method stub
return super.configure(name, params);
}
@Override
public boolean start() {
// TODO Auto-generated method stub
return super.start();
}
@Override
public void work() {
while (true) {
try {
s_logger.trace("Checking the work queue");
List<SecurityGroupWork> workItems = _workQueue.getWork(1);
for (SecurityGroupWork work: workItems) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Processing " + work.getInstanceId());
}
try {
VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId());
if (rulesetLog == null) {
s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId());
continue;
}
work.setLogsequenceNumber(rulesetLog.getLogsequence());
sendRulesetUpdates(work);
}catch (Exception e) {
s_logger.error("Problem during SG work " + work, e);
work.setStep(Step.Error);
}
}
} catch (final Throwable th) {
s_logger.error("Caught this throwable, ", th);
}
}
}
protected void sendRulesetUpdates(SecurityGroupWork work){
Long userVmId = work.getInstanceId();
UserVm vm = _userVMDao.findById(userVmId);
if (vm != null && vm.getState() == State.Running) {
Map<PortAndProto, Set<String>> rules = generateRulesForVM(userVmId);
Long agentId = vm.getHostId();
if (agentId != null) {
SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(),
vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(rules),
work.getLogsequenceNumber(), rules);
Commands cmds = new Commands(cmd);
try {
_agentMgr.send(agentId, cmds, _answerListener);
} catch (AgentUnavailableException e) {
s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")");
}
}
} else {
if (s_logger.isTraceEnabled()) {
if (vm != null)
s_logger.trace("No rules sent to vm " + vm + "state=" + vm.getState());
else
s_logger.trace("Could not find vm: No rules sent to vm " + userVmId );
}
}
}
@Override
public void cleanupFinishedWork() {
//no-op
}
}

View File

@ -0,0 +1,36 @@
/**
* Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.network.security;
import java.util.List;
import java.util.Set;
/**
* Security Group Work queue
* standard producer / consumer interface
*
*/
public interface SecurityGroupWorkQueue {
void submitWorkForVm(long vmId, long sequenceNumber);
void submitWorkForVms(Set<Long> vmIds);
List<SecurityGroupWork> getWork(int numberOfWorkItems);
}

View File

@ -21,12 +21,13 @@ package com.cloud.network.security.dao;
import java.util.Date;
import java.util.List;
import com.cloud.network.security.SecurityGroupWork;
import com.cloud.network.security.SecurityGroupWorkVO;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.utils.db.GenericDao;
public interface SecurityGroupWorkDao extends GenericDao<SecurityGroupWorkVO, Long> {
SecurityGroupWorkVO findByVmId(long vmId, boolean taken);
SecurityGroupWork findByVmId(long vmId, boolean taken);
SecurityGroupWorkVO findByVmIdStep(long vmId, Step step);

View File

@ -26,8 +26,9 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.ha.HaWorkVO;
import com.cloud.network.security.SecurityGroupWork;
import com.cloud.network.security.SecurityGroupWorkVO;
import com.cloud.network.security.SecurityGroupWorkVO.Step;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
@ -91,7 +92,7 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
}
@Override
public SecurityGroupWorkVO findByVmId(long vmId, boolean taken) {
public SecurityGroupWork findByVmId(long vmId, boolean taken) {
SearchCriteria<SecurityGroupWorkVO> sc = taken?VmIdTakenSearch.create():VmIdUnTakenSearch.create();
sc.setParameters("vmId", vmId);
return findOneIncludingRemovedBy(sc);
@ -130,9 +131,9 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase<SecurityGroupWorkVO
if (processing) {
//the caller to take() should check the step and schedule another work item to come back
//and take a look.
work.setStep(SecurityGroupWorkVO.Step.Done);
work.setStep(SecurityGroupWork.Step.Done);
} else {
work.setStep(SecurityGroupWorkVO.Step.Processing);
work.setStep(SecurityGroupWork.Step.Processing);
}
update(work.getId(), work);

View File

@ -18,10 +18,14 @@
package com.cloud.network.security.dao;
import java.util.Set;
import com.cloud.network.security.VmRulesetLogVO;
import com.cloud.utils.db.GenericDao;
public interface VmRulesetLogDao extends GenericDao<VmRulesetLogVO, Long> {
VmRulesetLogVO findByVmId(long vmId);
void createOrUpdate(Set<Long> workItems);
}

View File

@ -18,17 +18,27 @@
package com.cloud.network.security.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;
import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.network.security.VmRulesetLogVO;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
@Local(value={VmRulesetLogDao.class})
public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> implements VmRulesetLogDao {
public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> implements VmRulesetLogDao {
protected static Logger s_logger = Logger.getLogger(VmRulesetLogDaoImpl.class);
private SearchBuilder<VmRulesetLogVO> VmIdSearch;
private String INSERT_OR_UPDATE = "INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) " +
" VALUES(?, now(), 1) ON DUPLICATE KEY UPDATE logsequence=logsequence+1";
protected VmRulesetLogDaoImpl() {
VmIdSearch = createSearchBuilder();
@ -43,6 +53,39 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase<VmRulesetLogVO, Long> im
SearchCriteria<VmRulesetLogVO> sc = VmIdSearch.create();
sc.setParameters("vmId", vmId);
return findOneIncludingRemovedBy(sc);
}
@Override
public void createOrUpdate(Set<Long> workItems) {
Transaction txn = Transaction.currentTxn();
PreparedStatement stmtInsert = null;
int [] queryResult = null;
boolean success = true;
try {
stmtInsert = txn.prepareAutoCloseStatement(INSERT_OR_UPDATE);
for (Long vmId: workItems) {
stmtInsert.setLong(1, vmId);
stmtInsert.addBatch();
}
queryResult = stmtInsert.executeBatch();
txn.commit();
if (s_logger.isTraceEnabled())
s_logger.trace("Updated or inserted " + workItems.size() + " log items");
} catch (SQLException e) {
s_logger.warn("Failed to execute batch update statement for ruleset log");
success = false;
}
if (!success && queryResult != null) {
Long [] arrayItems = new Long[workItems.size()];
workItems.toArray(arrayItems);
for (int i=0; i < queryResult.length; i++) {
if (queryResult[i] < 0 ) {
s_logger.debug("Batch query update failed for vm " + arrayItems[i]);
}
}
}
return;
}