diff --git a/core/src/com/cloud/storage/template/DownloadManagerImpl.java b/core/src/com/cloud/storage/template/DownloadManagerImpl.java index f141a2610e9..7d1361bb48b 100755 --- a/core/src/com/cloud/storage/template/DownloadManagerImpl.java +++ b/core/src/com/cloud/storage/template/DownloadManagerImpl.java @@ -47,6 +47,7 @@ import com.cloud.exception.InternalErrorException; import com.cloud.storage.Storage.ImageFormat; import com.cloud.storage.StorageLayer; import com.cloud.storage.VMTemplateHostVO; +import com.cloud.storage.VMTemplateStorageResourceAssoc; import com.cloud.storage.template.Processor.FormatInfo; import com.cloud.storage.template.TemplateDownloader.DownloadCompleteCallback; import com.cloud.storage.template.TemplateDownloader.Status; @@ -58,7 +59,6 @@ import com.cloud.utils.component.ComponentLocator.ComponentInfo; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.script.OutputInterpreter; import com.cloud.utils.script.Script; -import com.cloud.storage.VMTemplateStorageResourceAssoc; /** * @author chiradeep @@ -180,8 +180,9 @@ public class DownloadManagerImpl implements DownloadManager { File f = new File(dnldPath); File dir = f.getParentFile(); f.delete(); - if (dir != null) + if (dir != null) { dir.delete(); + } } } @@ -194,7 +195,7 @@ public class DownloadManagerImpl implements DownloadManager { public long getTemplatesize() { return templatesize; } - + public void setTemplatePhysicalSize(long templatePhysicalSize) { this.templatePhysicalSize = templatePhysicalSize; } @@ -215,7 +216,7 @@ public class DownloadManagerImpl implements DownloadManager { private final Map jobs = new ConcurrentHashMap(); private String listTmpltScr; private int installTimeoutPerGig = 180 * 60 * 1000; - private boolean _sslCopy; + private boolean _sslCopy; /** * Get notified of change of job status. Executed in context of downloader thread @@ -282,7 +283,7 @@ public class DownloadManagerImpl implements DownloadManager { templatePath = dnld.getInstallPathPrefix() + dnld.getAccountId() + File.separator + dnld.getId() + File.separator;// dnld.getTmpltName(); _storage.mkdirs(templatePath); - + // once template path is set, remove the parent dir so that the template is installed with a relative path String finalTemplatePath = templatePath.substring(parentDir.length()); dnld.setTmpltPath(finalTemplatePath); @@ -309,8 +310,8 @@ public class DownloadManagerImpl implements DownloadManager { } else { templateName = java.util.UUID.nameUUIDFromBytes((jobs.get(jobId).getTmpltName() + System.currentTimeMillis()).getBytes()).toString(); } - - String templateFilename = templateName + "." + extension; + + String templateFilename = templateName + "." + extension; dnld.setTmpltPath(finalTemplatePath + "/" + templateFilename); scr.add("-n", templateFilename); @@ -326,11 +327,11 @@ public class DownloadManagerImpl implements DownloadManager { if (result != null) { return result; } - + // Set permissions for the downloaded template File downloadedTemplate = new File(templatePath + "/" + templateFilename); _storage.setWorldReadableAndWriteable(downloadedTemplate); - + // Set permissions for template.properties File templateProperties = new File(templatePath + "/template.properties"); _storage.setWorldReadableAndWriteable(templateProperties); @@ -343,18 +344,18 @@ public class DownloadManagerImpl implements DownloadManager { loc.purge(); return "Unable to download due to " + e.getMessage(); } - + Enumeration en = _processors.enumeration(); while (en.hasMoreElements()) { Processor processor = en.nextElement(); - + FormatInfo info = null; - try { - info = processor.process(templatePath, null, templateName); - } catch (InternalErrorException e) { - s_logger.error("Template process exception ", e); - return e.toString(); - } + try { + info = processor.process(templatePath, null, templateName); + } catch (InternalErrorException e) { + s_logger.error("Template process exception ", e); + return e.toString(); + } if (info != null) { loc.addFormat(info); dnld.setTemplatesize(info.virtualSize); @@ -362,12 +363,12 @@ public class DownloadManagerImpl implements DownloadManager { break; } } - + if (!loc.save()) { s_logger.warn("Cleaning up because we're unable to save the formats"); loc.purge(); } - + return null; } @@ -390,19 +391,19 @@ public class DownloadManagerImpl implements DownloadManager { String tmpDir = installPathPrefix + File.separator + accountId + File.separator + id; try { - + if (!_storage.mkdirs(tmpDir)) { s_logger.warn("Unable to create " + tmpDir); return "Unable to create " + tmpDir; } File file = _storage.getFile(tmpDir + File.separator + TemplateLocation.Filename); - + if (!file.createNewFile()) { s_logger.warn("Unable to create new file: " + file.getAbsolutePath()); return "Unable to create new file: " + file.getAbsolutePath(); } - + URI uri; try { uri = new URI(url); @@ -459,7 +460,7 @@ public class DownloadManagerImpl implements DownloadManager { } return 0; } - + public long getDownloadTemplatePhysicalSize(String jobId) { DownloadJob dj = jobs.get(jobId); if (dj != null) { @@ -534,10 +535,10 @@ public class DownloadManagerImpl implements DownloadManager { String user = null; String password = null; if (cmd.getAuth() != null) { - user = cmd.getAuth().getUserName(); - password = new String(cmd.getAuth().getPassword()); + user = cmd.getAuth().getUserName(); + password = new String(cmd.getAuth().getPassword()); } - + long maxDownloadSizeInBytes = (cmd.getMaxDownloadSizeInBytes() == null) ? TemplateDownloader.DEFAULT_MAX_TEMPLATE_SIZE_IN_BYTES : (cmd.getMaxDownloadSizeInBytes()); String jobId = downloadPublicTemplate(cmd.getId(), cmd.getUrl(), cmd.getName(), cmd.getFormat(), cmd.isHvm(), cmd.getAccountId(), cmd.getDescription(), cmd.getChecksum(), installPathPrefix, user, password, maxDownloadSizeInBytes); sleep(); @@ -560,8 +561,9 @@ public class DownloadManagerImpl implements DownloadManager { String jobId = cmd.getJobId(); DownloadAnswer answer; DownloadJob dj = null; - if (jobId != null) + if (jobId != null) { dj = jobs.get(jobId); + } if (dj == null) { if (cmd.getRequest() == RequestType.GET_OR_RESTART) { DownloadCommand dcmd = new DownloadCommand(cmd); @@ -585,14 +587,14 @@ public class DownloadManagerImpl implements DownloadManager { break; case PURGE: td.stopDownload(); - answer = new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId), + answer = new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId), getInstallPath(jobId), getDownloadTemplateSize(jobId), getDownloadTemplatePhysicalSize(jobId)); jobs.remove(jobId); return answer; default: break; // TODO } - return new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId), + return new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId), getInstallPath(jobId), getDownloadTemplateSize(jobId), getDownloadTemplatePhysicalSize(jobId)); } @@ -654,7 +656,7 @@ public class DownloadManagerImpl implements DownloadManager { } continue; } - + TemplateInfo tInfo = loc.getTemplateInfo(); result.put(tInfo.templateName, tInfo); @@ -670,7 +672,7 @@ public class DownloadManagerImpl implements DownloadManager { s_logger.debug("Added iso template name: " + tmpltName + ", path: " + tmplt); result.put(tmpltName, tInfo); } - */ + */ return result; } @@ -715,6 +717,11 @@ public class DownloadManagerImpl implements DownloadManager { public List getPaths() { return paths; } + + @Override + public boolean drain() { + return true; + } } public DownloadManagerImpl() { @@ -744,22 +751,22 @@ public class DownloadManagerImpl implements DownloadManager { } String useSsl = (String)params.get("sslcopy"); if (useSsl != null) { - _sslCopy = Boolean.parseBoolean(useSsl); - + _sslCopy = Boolean.parseBoolean(useSsl); + } configureFolders(name, params); String inSystemVM = (String)params.get("secondary.storage.vm"); if (inSystemVM != null && "true".equalsIgnoreCase(inSystemVM)) { - s_logger.info("DownloadManager: starting additional services since we are inside system vm"); - startAdditionalServices(); - blockOutgoingOnPrivate(); + s_logger.info("DownloadManager: starting additional services since we are inside system vm"); + startAdditionalServices(); + blockOutgoingOnPrivate(); } value = (String) params.get("install.timeout.pergig"); this.installTimeoutPerGig = NumbersUtil.parseInt(value, 15 * 60) * 1000; value = (String) params.get("install.numthreads"); - final int numInstallThreads = NumbersUtil.parseInt(value, 10); + final int numInstallThreads = NumbersUtil.parseInt(value, 10); String scriptsDir = (String) params.get("template.scripts.dir"); if (scriptsDir == null) { @@ -779,15 +786,15 @@ public class DownloadManagerImpl implements DownloadManager { s_logger.info("createtmplt.sh found in " + createTmpltScr); List> processors = new ArrayList>(); - + Processor processor = new VhdProcessor(); processor.configure("VHD Processor", params); processors.add(new ComponentInfo("VHD Processor", VhdProcessor.class, processor)); - + processor = new IsoProcessor(); processor.configure("ISO Processor", params); processors.add(new ComponentInfo("ISO Processor", IsoProcessor.class, processor)); - + processor = new QCOW2Processor(); processor.configure("QCOW2 Processor", params); processors.add(new ComponentInfo("QCOW2 Processor", QCOW2Processor.class, processor)); @@ -795,7 +802,7 @@ public class DownloadManagerImpl implements DownloadManager { processor = new VmdkProcessor(); processor.configure("VMDK Processor", params); processors.add(new ComponentInfo("VMDK Processor", VmdkProcessor.class, processor)); - + _processors = new Adapters("processors", processors); // Add more processors here. threadPool = Executors.newFixedThreadPool(numInstallThreads); @@ -803,20 +810,20 @@ public class DownloadManagerImpl implements DownloadManager { } private void blockOutgoingOnPrivate() { - Script command = new Script("/bin/bash", s_logger); - String intf = "eth1"; - command.add("-c"); - command.add("iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "80" + " -j REJECT;" + - "iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j REJECT;"); + Script command = new Script("/bin/bash", s_logger); + String intf = "eth1"; + command.add("-c"); + command.add("iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "80" + " -j REJECT;" + + "iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j REJECT;"); - String result = command.execute(); - if (result != null) { - s_logger.warn("Error in blocking outgoing to port 80/443 err=" + result ); - return; - } - } + String result = command.execute(); + if (result != null) { + s_logger.warn("Error in blocking outgoing to port 80/443 err=" + result ); + return; + } + } - protected void configureFolders(String name, Map params) throws ConfigurationException { + protected void configureFolders(String name, Map params) throws ConfigurationException { parentDir = (String) params.get("template.parent"); if (parentDir == null) { throw new ConfigurationException("Unable to find the parent root for the templates"); @@ -826,19 +833,19 @@ public class DownloadManagerImpl implements DownloadManager { if (value == null) { value = TemplateConstants.DEFAULT_TMPLT_ROOT_DIR; } - + if (value.startsWith(File.separator)) { publicTemplateRepo = value; } else { publicTemplateRepo = parentDir + File.separator + value; } - + if (!publicTemplateRepo.endsWith(File.separator)) { publicTemplateRepo += File.separator; } - + publicTemplateRepo += TemplateConstants.DEFAULT_TMPLT_FIRST_LEVEL_DIR; - + if (!_storage.mkdirs(publicTemplateRepo)) { throw new ConfigurationException("Unable to create public templates directory"); } @@ -858,54 +865,54 @@ public class DownloadManagerImpl implements DownloadManager { public boolean stop() { return true; } - - private void startAdditionalServices() { - - Script command = new Script("/bin/bash", s_logger); - command.add("-c"); - command.add("if [ -d /etc/apache2 ] ; then service apache2 stop; else service httpd stop; fi "); - String result = command.execute(); - if (result != null) { - s_logger.warn("Error in stopping httpd service err=" + result ); - } - String port = Integer.toString(TemplateConstants.DEFAULT_TMPLT_COPY_PORT); - String intf = TemplateConstants.DEFAULT_TMPLT_COPY_INTF; - - command = new Script("/bin/bash", s_logger); - command.add("-c"); - command.add("iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + port + " -j ACCEPT;" + - "iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j ACCEPT;"); - result = command.execute(); - if (result != null) { - s_logger.warn("Error in opening up httpd port err=" + result ); - return; - } - - command = new Script("/bin/bash", s_logger); - command.add("-c"); - command.add("if [ -d /etc/apache2 ] ; then service apache2 start; else service httpd start; fi "); - result = command.execute(); - if (result != null) { - s_logger.warn("Error in starting httpd service err=" + result ); - return; - } - command = new Script("mkdir", s_logger); - command.add("-p"); - command.add("/var/www/html/copy/template"); - result = command.execute(); - if (result != null) { - s_logger.warn("Error in creating directory =" + result ); - return; - } - - command = new Script("/bin/bash", s_logger); - command.add("-c"); - command.add("ln -sf " + publicTemplateRepo + " /var/www/html/copy/template"); - result = command.execute(); - if (result != null) { - s_logger.warn("Error in linking err=" + result ); - return; - } - } + private void startAdditionalServices() { + + Script command = new Script("/bin/bash", s_logger); + command.add("-c"); + command.add("if [ -d /etc/apache2 ] ; then service apache2 stop; else service httpd stop; fi "); + String result = command.execute(); + if (result != null) { + s_logger.warn("Error in stopping httpd service err=" + result ); + } + String port = Integer.toString(TemplateConstants.DEFAULT_TMPLT_COPY_PORT); + String intf = TemplateConstants.DEFAULT_TMPLT_COPY_INTF; + + command = new Script("/bin/bash", s_logger); + command.add("-c"); + command.add("iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + port + " -j ACCEPT;" + + "iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j ACCEPT;"); + + result = command.execute(); + if (result != null) { + s_logger.warn("Error in opening up httpd port err=" + result ); + return; + } + + command = new Script("/bin/bash", s_logger); + command.add("-c"); + command.add("if [ -d /etc/apache2 ] ; then service apache2 start; else service httpd start; fi "); + result = command.execute(); + if (result != null) { + s_logger.warn("Error in starting httpd service err=" + result ); + return; + } + command = new Script("mkdir", s_logger); + command.add("-p"); + command.add("/var/www/html/copy/template"); + result = command.execute(); + if (result != null) { + s_logger.warn("Error in creating directory =" + result ); + return; + } + + command = new Script("/bin/bash", s_logger); + command.add("-c"); + command.add("ln -sf " + publicTemplateRepo + " /var/www/html/copy/template"); + result = command.execute(); + if (result != null) { + s_logger.warn("Error in linking err=" + result ); + return; + } + } } diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java b/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java index 789ea2815d0..736c78535cb 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentAttache.java @@ -19,138 +19,139 @@ import com.cloud.utils.nio.Link; public class ClusteredAgentAttache extends ConnectedAgentAttache implements Routable { - private final static Logger s_logger = Logger.getLogger(ClusteredAgentAttache.class); - private static ClusteredAgentManagerImpl s_clusteredAgentMgr; - protected ByteBuffer _buffer = ByteBuffer.allocate(2048); - private boolean _forward = false; + private final static Logger s_logger = Logger.getLogger(ClusteredAgentAttache.class); + private static ClusteredAgentManagerImpl s_clusteredAgentMgr; + protected ByteBuffer _buffer = ByteBuffer.allocate(2048); + private boolean _forward = false; - static public void initialize(ClusteredAgentManagerImpl agentMgr) { - s_clusteredAgentMgr = agentMgr; - } + static public void initialize(ClusteredAgentManagerImpl agentMgr) { + s_clusteredAgentMgr = agentMgr; + } - public ClusteredAgentAttache(long id) { - super(id, null, false); - _forward = true; - } - - public ClusteredAgentAttache(long id, Link link, boolean maintenance) { - super(id, link, maintenance); - _forward = link == null; - } - - @Override - public boolean isClosed() { - return _forward ? false : super.isClosed(); - } - - @Override - public boolean forForward() { - return _forward; - } - - @Override + public ClusteredAgentAttache(long id) { + super(id, null, false); + _forward = true; + } + + public ClusteredAgentAttache(long id, Link link, boolean maintenance) { + super(id, link, maintenance); + _forward = link == null; + } + + @Override + public boolean isClosed() { + return _forward ? false : super.isClosed(); + } + + @Override + public boolean forForward() { + return _forward; + } + + @Override public void cancel(long seq) { - if (forForward()) { - Listener listener = getListener(seq); - if (listener != null && listener instanceof SynchronousListener) { - SynchronousListener synchronous = (SynchronousListener)listener; - String peerName = synchronous.getPeer(); - if (peerName != null) { - s_logger.debug(log(seq, "Forwarding to peer to cancel due to timeout")); - s_clusteredAgentMgr.cancel(peerName, _id, seq, "Timed Out"); - } - } - } - - super.cancel(seq); - } - - @Override - public void routeToAgent(byte[] data) throws AgentUnavailableException { - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(Request.getSequence(data), "Routing from " + Request.getManagementServerId(data))); - } - - if (_link == null) { - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(Request.getSequence(data), "Link is closed")); - } - } - - try { - _link.send(data); - } catch (ClosedChannelException e) { - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(Request.getSequence(data), "Channel is closed")); - } - - throw new AgentUnavailableException("Channel to agent is closed", _id); - } catch (NullPointerException e) { + if (forForward()) { + Listener listener = getListener(seq); + if (listener != null && listener instanceof SynchronousListener) { + SynchronousListener synchronous = (SynchronousListener)listener; + String peerName = synchronous.getPeer(); + if (peerName != null) { + s_logger.debug(log(seq, "Forwarding to peer to cancel due to timeout")); + s_clusteredAgentMgr.cancel(peerName, _id, seq, "Timed Out"); + } + } + } + + super.cancel(seq); + } + + @Override + public void routeToAgent(byte[] data) throws AgentUnavailableException { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(Request.getSequence(data), "Routing from " + Request.getManagementServerId(data))); + } + + if (_link == null) { if (s_logger.isDebugEnabled()) { s_logger.debug(log(Request.getSequence(data), "Link is closed")); } - // Note: since this block is not in synchronized. It is possible for _link to become null. + throw new AgentUnavailableException("Link to agent is closed", _id); + } + + try { + _link.send(data); + } catch (ClosedChannelException e) { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(Request.getSequence(data), "Channel is closed")); + } + throw new AgentUnavailableException("Channel to agent is closed", _id); - } - } - - @Override - public void send(Request req, Listener listener) throws AgentUnavailableException { - if (_link != null) { - super.send(req, listener); - return; - } - - long seq = req.getSequence(); - - if (listener != null) { - registerListener(req.getSequence(), listener); - } - - int i = 0; - SocketChannel ch = null; - boolean error = true; - try { - while (i++ < 5) { - String peerName = s_clusteredAgentMgr.findPeer(_id); - if (peerName == null) { - throw new AgentUnavailableException("Unable to find peer", _id); - } - - ch = s_clusteredAgentMgr.connectToPeer(peerName, ch); - if (ch == null) { - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(seq, "Unable to forward " + req.toString())); - } - continue; - } - - try { - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(seq, "Forwarding " + req.toString() + " to " + peerName)); - } - if (req.executeInSequence() && listener != null && listener instanceof SynchronousListener) { - SynchronousListener synchronous = (SynchronousListener)listener; - synchronous.setPeer(peerName); - } - Link.write(ch, req.toBytes()); - error = false; - return; - } catch (IOException e) { - if (s_logger.isDebugEnabled()) { - s_logger.debug(log(seq, "Error on connecting to management node: " + req.toString() + " try = " + i)); - } - - if(s_logger.isInfoEnabled()) { + } catch (NullPointerException e) { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(Request.getSequence(data), "Link is closed")); + } + // Note: since this block is not in synchronized. It is possible for _link to become null. + throw new AgentUnavailableException("Unable to send", _id); + } + } + + @Override + public void send(Request req, Listener listener) throws AgentUnavailableException { + if (_link != null) { + super.send(req, listener); + return; + } + + long seq = req.getSequence(); + + if (listener != null) { + registerListener(req.getSequence(), listener); + } + + int i = 0; + SocketChannel ch = null; + boolean error = true; + try { + while (i++ < 5) { + String peerName = s_clusteredAgentMgr.findPeer(_id); + if (peerName == null) { + throw new AgentUnavailableException("Unable to find peer", _id); + } + + ch = s_clusteredAgentMgr.connectToPeer(peerName, ch); + if (ch == null) { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Unable to forward " + req.toString())); + } + continue; + } + + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Forwarding " + req.toString() + " to " + peerName)); + } + if (req.executeInSequence() && listener != null && listener instanceof SynchronousListener) { + SynchronousListener synchronous = (SynchronousListener)listener; + synchronous.setPeer(peerName); + } + Link.write(ch, req.toBytes()); + error = false; + return; + } catch (IOException e) { + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Error on connecting to management node: " + req.toString() + " try = " + i)); + } + + if(s_logger.isInfoEnabled()) { s_logger.info("IOException " + e.getMessage() + " when sending data to peer " + peerName + ", close peer connection and let it re-open"); } - } - } - } finally { - if (error) { - unregisterListener(seq); - } - } - throw new AgentUnavailableException("Unable to reach the peer that the agent is connected", _id); - } + } + } + } finally { + if (error) { + unregisterListener(seq); + } + } + throw new AgentUnavailableException("Unable to reach the peer that the agent is connected", _id); + } }