/** * Copyright (C) 2011 Citrix Systems, Inc. All rights reserved. * * This software is licensed under the GNU General Public License v3 or later. * * It is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or any later version. * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ package com.cloud.network.security; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import javax.ejb.Local; import javax.naming.ConfigurationException; import com.cloud.agent.api.SecurityIngressRulesCmd; import com.cloud.agent.manager.Commands; import com.cloud.configuration.Config; import com.cloud.exception.AgentUnavailableException; import com.cloud.network.security.SecurityGroupWork.Step; import com.cloud.uservm.UserVm; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Profiler; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.mgmt.JmxUtil; import com.cloud.vm.VirtualMachine.State; /** * Same as the base class -- except it uses the abstracted security group work queue * */ @Local(value={ SecurityGroupManager.class, SecurityGroupService.class }) public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{ SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue(); SecurityGroupWorkTracker _workTracker; SecurityManagerMBeanImpl _mBean; WorkerThread[] _workers; private Set _disabledVms = Collections.newSetFromMap(new ConcurrentHashMap()); private boolean _schedulerDisabled = false; protected class WorkerThread extends Thread { public WorkerThread(String name) { super(name); } @Override public void run() { while (true) { try{ work(); } catch (final Throwable th) { s_logger.error("SG Work: Caught this throwable, ", th); } } } } @Override protected void createThreadPools() { _workers = new WorkerThread[_numWorkerThreads]; for (int i = 0; i < _workers.length; i++) { _workers[i] = new WorkerThread("SecGrp-Worker-" + i); } } @Override //@DB public void scheduleRulesetUpdateToHosts(List affectedVms, boolean updateSeqno, Long delayMs) { if (affectedVms.size() == 0) { return; } if (_schedulerDisabled) { s_logger.debug("Security Group Mgr v2: scheduler disabled, doing nothing for " + affectedVms.size() + " vms"); return; } Set workItems = new TreeSet(); workItems.addAll(affectedVms); workItems.removeAll(_disabledVms); if (s_logger.isDebugEnabled()) { s_logger.debug("Security Group Mgr v2: scheduling ruleset updates for " + affectedVms.size() + " vms " + " (unique=" + workItems.size() + "), current queue size=" + _workQueue.size()); } Profiler p = new Profiler(); p.start(); int updated = 0; if (updateSeqno) { updated = _rulesetLogDao.createOrUpdate(workItems); if (updated < workItems.size()) { throw new CloudRuntimeException("Failed to create ruleset log entries"); } } int newJobs = _workQueue.submitWorkForVms(workItems); _mBean.logScheduledDetails(workItems); p.stop(); if (s_logger.isDebugEnabled()){ s_logger.debug("Security Group Mgr v2: done scheduling ruleset updates for " + workItems.size() + " vms: num new jobs=" + newJobs + " num rows insert or updated=" + updated + " time taken=" + p.getDuration()); } } @Override public boolean start() { for (final WorkerThread thread : _workers) { thread.start(); } return true; } @Override public void work() { s_logger.trace("Checking the work queue"); List workItems; try { workItems = _workQueue.getWork(1); for (SecurityGroupWork work: workItems) { if (s_logger.isTraceEnabled()) { s_logger.trace("Processing " + work.getInstanceId()); } try { VmRulesetLogVO rulesetLog = _rulesetLogDao.findByVmId(work.getInstanceId()); if (rulesetLog == null) { s_logger.warn("Could not find ruleset log for vm " + work.getInstanceId()); continue; } work.setLogsequenceNumber(rulesetLog.getLogsequence()); sendRulesetUpdates(work); _mBean.logUpdateDetails(work.getInstanceId(), work.getLogsequenceNumber()); }catch (Exception e) { s_logger.error("Problem during SG work " + work, e); work.setStep(Step.Error); } } } catch (InterruptedException e1) { s_logger.warn("SG work: caught InterruptException", e1); } } public void sendRulesetUpdates(SecurityGroupWork work){ Long userVmId = work.getInstanceId(); UserVm vm = _userVMDao.findById(userVmId); if (vm != null && vm.getState() == State.Running) { if (s_logger.isTraceEnabled()) { s_logger.trace("SecurityGroupManager v2: found vm, " + userVmId + " state=" + vm.getState()); } Long agentId = vm.getHostId(); if (agentId != null) { if ( !_workTracker.canSend(agentId)) { s_logger.trace("SecurityGroupManager v2: not sending ruleset update: too many messages outstanding"); return; } Map> rules = generateRulesForVM(userVmId); SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(), vm.getPrivateMacAddress(), vm.getId(), null, work.getLogsequenceNumber(), rules); cmd.setMsId(_serverId); if (s_logger.isDebugEnabled()) { s_logger.debug("SecurityGroupManager v2: sending ruleset update for vm " + vm.getInstanceName() + ": num rules=" + cmd.getRuleSet().length + " num cidrs=" + cmd.getTotalNumCidrs() + " sig=" + cmd.getSignature()); } Commands cmds = new Commands(cmd); try { _agentMgr.send(agentId, cmds, _answerListener); if (s_logger.isTraceEnabled()) { s_logger.trace("SecurityGroupManager v2: sent ruleset updates for " + vm.getInstanceName() + " curr queue size=" + _workQueue.size()); } } catch (AgentUnavailableException e) { s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")"); _workTracker.handleException(agentId); } } } else { if (s_logger.isDebugEnabled()) { if (vm != null) s_logger.debug("No rules sent to vm " + vm + "state=" + vm.getState()); else s_logger.debug("Could not find vm: No rules sent to vm " + userVmId ); } } } @Override public void cleanupFinishedWork() { //TODO: over time clean up op_vm_ruleset_log table for destroyed vms } /* * Same as the superclass, except that we use the ip address(es) returned from the join * made with the nics table when retrieving the SecurityGroupVmMapVO. If a vm has a single * nic then that nic is the default and then this query is correct. If the vm has multiple nics * then we get all ips, including the default nic ip. This is also probably the correct behavior. */ @Override protected Map> generateRulesForVM(Long userVmId) { Map> allowed = new TreeMap>(); List groupsForVm = _securityGroupVMMapDao.listByInstanceId(userVmId); for (SecurityGroupVMMapVO mapVO : groupsForVm) { List rules = _ingressRuleDao.listBySecurityGroupId(mapVO.getSecurityGroupId()); for (IngressRuleVO rule : rules) { PortAndProto portAndProto = new PortAndProto(rule.getProtocol(), rule.getStartPort(), rule.getEndPort()); Set cidrs = allowed.get(portAndProto); if (cidrs == null) { cidrs = new TreeSet(new CidrComparator()); } if (rule.getAllowedNetworkId() != null) { List allowedInstances = _securityGroupVMMapDao.listBySecurityGroup(rule.getAllowedNetworkId(), State.Running); for (SecurityGroupVMMapVO ngmapVO : allowedInstances) { //here, we differ from the superclass: instead of creating N more queries to the //nics table, we use what's already there in the VO since the listBySecurityGroup already //did a join with the nics table String cidr = ngmapVO.getGuestIpAddress() + "/32"; cidrs.add(cidr); } } else if (rule.getAllowedSourceIpCidr() != null) { cidrs.add(rule.getAllowedSourceIpCidr()); } if (cidrs.size() > 0) { allowed.put(portAndProto, cidrs); } } } return allowed; } public int getQueueSize() { return _workQueue.size(); } public SecurityGroupWorkQueue getWorkQueue() { return _workQueue; } @Override public boolean configure(String name, Map params) throws ConfigurationException { _mBean = new SecurityManagerMBeanImpl(this); try { JmxUtil.registerMBean("SecurityGroupManager", "SecurityGroupManagerImpl2", _mBean); } catch (Exception e){ s_logger.error("Failed to register MBean", e); } boolean result = super.configure(name, params); Map configs = _configDao.getConfiguration("Network", params); int bufferLength = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkPerAgentMaxQueueSize.key()), 100); _workTracker = new SecurityGroupWorkTracker(_agentMgr, _answerListener, bufferLength); _answerListener.setWorkDispatcher(_workTracker); return result; } public void disableSchedulerForVm(Long vmId, boolean disable) { if (disable) { _disabledVms.add(vmId); } else { _disabledVms.remove(vmId); } s_logger.warn("JMX operation: Scheduler state for vm " + vmId + ": new state disabled=" + disable); } public Long[] getDisabledVmsForScheduler() { Long [] result = new Long[_disabledVms.size()]; return _disabledVms.toArray(result ); } public void enableAllVmsForScheduler() { s_logger.warn("Cleared list of disabled VMs (JMX operation?)"); _disabledVms.clear(); } public void disableScheduler(boolean disable) { _schedulerDisabled = disable; s_logger.warn("JMX operation: Scheduler state changed: new state disabled=" + disable); } public boolean isSchedulerDisabled() { return _schedulerDisabled; } public void clearWorkQueue() { _workQueue.clear(); s_logger.warn("Cleared the work queue (possible JMX operation)"); } }