diff --git a/core/src/com/cloud/network/security/SecurityGroupWork.java b/core/src/com/cloud/network/security/SecurityGroupWork.java
new file mode 100644
index 00000000000..c2e096c6450
--- /dev/null
+++ b/core/src/com/cloud/network/security/SecurityGroupWork.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.network.security;
+
+/**
+ * Work related to security groups for a vm
+ *
+ */
+public interface SecurityGroupWork {
+
+ public enum Step {
+ Scheduled,
+ Processing,
+ Done,
+ Error
+ }
+
+ Long getInstanceId();
+ Long getLogsequenceNumber();
+ Step getStep();
+ void setStep(Step step);
+ public abstract void setLogsequenceNumber(Long logsequenceNumber);
+
+}
diff --git a/core/src/com/cloud/network/security/SecurityGroupWorkVO.java b/core/src/com/cloud/network/security/SecurityGroupWorkVO.java
index e95031e8aac..9171bbbf729 100644
--- a/core/src/com/cloud/network/security/SecurityGroupWorkVO.java
+++ b/core/src/com/cloud/network/security/SecurityGroupWorkVO.java
@@ -34,13 +34,8 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="op_nwgrp_work")
-public class SecurityGroupWorkVO {
- public enum Step {
- Scheduled,
- Processing,
- Done,
- Error
- }
+public class SecurityGroupWorkVO implements SecurityGroupWork{
+
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@@ -132,7 +127,8 @@ public class SecurityGroupWorkVO {
return logsequenceNumber;
}
- public void setLogsequenceNumber(Long logsequenceNumber) {
+ @Override
+ public void setLogsequenceNumber(Long logsequenceNumber) {
this.logsequenceNumber = logsequenceNumber;
}
diff --git a/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java b/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java
new file mode 100644
index 00000000000..e3a2ebb14ef
--- /dev/null
+++ b/server/src/com/cloud/network/security/LocalSecurityGroupWorkQueue.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.network.security;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.network.security.SecurityGroupWork.Step;
+
+
+/**
+ * Security Group Work Queue that is not shared with other management servers
+ *
+ */
+public class LocalSecurityGroupWorkQueue implements SecurityGroupWorkQueue {
+ protected static Logger s_logger = Logger.getLogger(LocalSecurityGroupWorkQueue.class);
+
+ protected LinkedBlockingQueue _queue = new LinkedBlockingQueue();
+
+ @Override
+ public void submitWorkForVm(long vmId, long sequenceNumber) {
+
+ SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null);
+ boolean result = _queue.offer(work);
+ if (!result) {
+ s_logger.warn("Failed to add work item into queue for vm id " + vmId);
+ }
+
+ }
+
+
+ @Override
+ public void submitWorkForVms(Set vmIds) {
+ for (Long vmId: vmIds) {
+ SecurityGroupWorkVO work = new SecurityGroupWorkVO(vmId, null, new Date(), SecurityGroupWork.Step.Scheduled, null);
+ boolean result = _queue.offer(work);
+ if (!result) {
+ s_logger.warn("Failed to add work item into queue for vm id " + vmId);
+ }
+ }
+ }
+
+
+ @Override
+ public List getWork(int numberOfWorkItems) {
+ List work = new ArrayList();
+ _queue.drainTo(work, numberOfWorkItems);
+ for (SecurityGroupWork w: work) {
+ w.setStep(Step.Processing);
+ }
+ return work;
+ }
+
+}
diff --git a/server/src/com/cloud/network/security/SecurityGroupListener.java b/server/src/com/cloud/network/security/SecurityGroupListener.java
index d19d0b4008f..8118ec444eb 100755
--- a/server/src/com/cloud/network/security/SecurityGroupListener.java
+++ b/server/src/com/cloud/network/security/SecurityGroupListener.java
@@ -37,7 +37,7 @@ import com.cloud.agent.manager.Commands;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
-import com.cloud.network.security.SecurityGroupWorkVO.Step;
+import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.network.security.dao.SecurityGroupWorkDao;
/**
diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java
index 346acf180c3..4e8aadf3d9d 100755
--- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java
+++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java
@@ -64,7 +64,7 @@ import com.cloud.exception.PermissionDeniedException;
import com.cloud.exception.ResourceInUseException;
import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.network.NetworkManager;
-import com.cloud.network.security.SecurityGroupWorkVO.Step;
+import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.network.security.dao.IngressRuleDao;
import com.cloud.network.security.dao.SecurityGroupDao;
import com.cloud.network.security.dao.SecurityGroupRulesDao;
@@ -355,7 +355,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
return allowed;
}
- private String generateRulesetSignature(Map> allowed) {
+ protected String generateRulesetSignature(Map> allowed) {
String ruleset = allowed.toString();
return DigestUtils.md5Hex(ruleset);
}
@@ -412,7 +412,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
- work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
+ work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWork.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId + "; id = " + work.getId());
diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java
new file mode 100644
index 00000000000..59bdd57c1fa
--- /dev/null
+++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl2.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.network.security;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.naming.ConfigurationException;
+
+import com.cloud.agent.api.SecurityIngressRulesCmd;
+import com.cloud.agent.manager.Commands;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.network.security.SecurityGroupWork.Step;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.db.Transaction;
+import com.cloud.vm.VirtualMachine.State;
+
+/**
+ * Same as the base class -- except it uses the abstracted security group work queue
+ *
+ */
+public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl {
+
+ SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue();
+
+
+ WorkerThread[] _workers;
+ long _timeToSleep = 10000;
+
+
+ protected class WorkerThread extends Thread {
+ public WorkerThread(String name) {
+ super(name);
+ }
+
+ @Override
+ public void run() {
+ work();
+ }
+
+ }
+
+ @Override
+ public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) {
+ if (affectedVms.size() == 0) {
+ return;
+ }
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Security Group Mgr2: scheduling ruleset updates for " + affectedVms.size() + " vms");
+ }
+ Set workItems = new TreeSet();
+ workItems.addAll(affectedVms);
+
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
+ _rulesetLogDao.createOrUpdate(workItems);
+ txn.commit();
+ _workQueue.submitWorkForVms(workItems);
+ notifyAll();
+
+ }
+
+ @Override
+ public boolean configure(String name, Map params) throws ConfigurationException {
+ // TODO Auto-generated method stub
+ return super.configure(name, params);
+ }
+
+ @Override
+ public boolean start() {
+ // TODO Auto-generated method stub
+ return super.start();
+ }
+
+ @Override
+ public void work() {
+ while (true) {
+ try {
+ s_logger.trace("Checking the work queue");
+ List workItems = _workQueue.getWork(1);
+
+ for (SecurityGroupWork work: workItems) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Processing " + work.getInstanceId());
+ }
+
+ try {
+ VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId());
+ if (rulesetLog == null) {
+ s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId());
+ continue;
+ }
+ work.setLogsequenceNumber(rulesetLog.getLogsequence());
+ sendRulesetUpdates(work);
+ }catch (Exception e) {
+ s_logger.error("Problem during SG work " + work, e);
+ work.setStep(Step.Error);
+ }
+ }
+ } catch (final Throwable th) {
+ s_logger.error("Caught this throwable, ", th);
+ }
+ }
+ }
+
+ protected void sendRulesetUpdates(SecurityGroupWork work){
+ Long userVmId = work.getInstanceId();
+ UserVm vm = _userVMDao.findById(userVmId);
+
+ if (vm != null && vm.getState() == State.Running) {
+ Map> rules = generateRulesForVM(userVmId);
+ Long agentId = vm.getHostId();
+ if (agentId != null) {
+ SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(),
+ vm.getPrivateMacAddress(), vm.getId(), generateRulesetSignature(rules),
+ work.getLogsequenceNumber(), rules);
+ Commands cmds = new Commands(cmd);
+ try {
+ _agentMgr.send(agentId, cmds, _answerListener);
+ } catch (AgentUnavailableException e) {
+ s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")");
+ }
+ }
+ } else {
+ if (s_logger.isTraceEnabled()) {
+ if (vm != null)
+ s_logger.trace("No rules sent to vm " + vm + "state=" + vm.getState());
+ else
+ s_logger.trace("Could not find vm: No rules sent to vm " + userVmId );
+ }
+ }
+ }
+
+ @Override
+ public void cleanupFinishedWork() {
+ //no-op
+ }
+
+
+}
diff --git a/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java b/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java
new file mode 100644
index 00000000000..5667c92b461
--- /dev/null
+++ b/server/src/com/cloud/network/security/SecurityGroupWorkQueue.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.network.security;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Security Group Work queue
+ * standard producer / consumer interface
+ *
+ */
+public interface SecurityGroupWorkQueue {
+
+ void submitWorkForVm(long vmId, long sequenceNumber);
+
+ void submitWorkForVms(Set vmIds);
+
+ List getWork(int numberOfWorkItems);
+}
diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java
index 3e5819a3263..43f7c34f3eb 100644
--- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java
+++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDao.java
@@ -21,12 +21,13 @@ package com.cloud.network.security.dao;
import java.util.Date;
import java.util.List;
+import com.cloud.network.security.SecurityGroupWork;
import com.cloud.network.security.SecurityGroupWorkVO;
-import com.cloud.network.security.SecurityGroupWorkVO.Step;
+import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.utils.db.GenericDao;
public interface SecurityGroupWorkDao extends GenericDao {
- SecurityGroupWorkVO findByVmId(long vmId, boolean taken);
+ SecurityGroupWork findByVmId(long vmId, boolean taken);
SecurityGroupWorkVO findByVmIdStep(long vmId, Step step);
diff --git a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java
index eea2da088f1..f40beb6df1b 100644
--- a/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java
+++ b/server/src/com/cloud/network/security/dao/SecurityGroupWorkDaoImpl.java
@@ -26,8 +26,9 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.ha.HaWorkVO;
+import com.cloud.network.security.SecurityGroupWork;
import com.cloud.network.security.SecurityGroupWorkVO;
-import com.cloud.network.security.SecurityGroupWorkVO.Step;
+import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
@@ -91,7 +92,7 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase sc = taken?VmIdTakenSearch.create():VmIdUnTakenSearch.create();
sc.setParameters("vmId", vmId);
return findOneIncludingRemovedBy(sc);
@@ -130,9 +131,9 @@ public class SecurityGroupWorkDaoImpl extends GenericDaoBase {
VmRulesetLogVO findByVmId(long vmId);
+
+ void createOrUpdate(Set workItems);
}
diff --git a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java
index cb536c2c421..1394646130a 100644
--- a/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java
+++ b/server/src/com/cloud/network/security/dao/VmRulesetLogDaoImpl.java
@@ -18,17 +18,27 @@
package com.cloud.network.security.dao;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Set;
+
import javax.ejb.Local;
+import org.apache.log4j.Logger;
+
import com.cloud.network.security.VmRulesetLogVO;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
@Local(value={VmRulesetLogDao.class})
-public class VmRulesetLogDaoImpl extends GenericDaoBase implements VmRulesetLogDao {
+public class VmRulesetLogDaoImpl extends GenericDaoBase implements VmRulesetLogDao {
+ protected static Logger s_logger = Logger.getLogger(VmRulesetLogDaoImpl.class);
private SearchBuilder VmIdSearch;
-
+ private String INSERT_OR_UPDATE = "INSERT INTO op_vm_ruleset_log (instance_id, created, logsequence) " +
+ " VALUES(?, now(), 1) ON DUPLICATE KEY UPDATE logsequence=logsequence+1";
+
protected VmRulesetLogDaoImpl() {
VmIdSearch = createSearchBuilder();
@@ -43,6 +53,39 @@ public class VmRulesetLogDaoImpl extends GenericDaoBase im
SearchCriteria sc = VmIdSearch.create();
sc.setParameters("vmId", vmId);
return findOneIncludingRemovedBy(sc);
+ }
+
+ @Override
+ public void createOrUpdate(Set workItems) {
+ Transaction txn = Transaction.currentTxn();
+ PreparedStatement stmtInsert = null;
+ int [] queryResult = null;
+ boolean success = true;
+ try {
+ stmtInsert = txn.prepareAutoCloseStatement(INSERT_OR_UPDATE);
+ for (Long vmId: workItems) {
+ stmtInsert.setLong(1, vmId);
+ stmtInsert.addBatch();
+ }
+ queryResult = stmtInsert.executeBatch();
+
+ txn.commit();
+ if (s_logger.isTraceEnabled())
+ s_logger.trace("Updated or inserted " + workItems.size() + " log items");
+ } catch (SQLException e) {
+ s_logger.warn("Failed to execute batch update statement for ruleset log");
+ success = false;
+ }
+ if (!success && queryResult != null) {
+ Long [] arrayItems = new Long[workItems.size()];
+ workItems.toArray(arrayItems);
+ for (int i=0; i < queryResult.length; i++) {
+ if (queryResult[i] < 0 ) {
+ s_logger.debug("Batch query update failed for vm " + arrayItems[i]);
+ }
+ }
+ }
+ return;
}