mirror of https://github.com/apache/cloudstack.git
fixed problem with templates being too long
This commit is contained in:
parent
0be5965a90
commit
fa5797b774
|
|
@ -47,6 +47,7 @@ import com.cloud.exception.InternalErrorException;
|
|||
import com.cloud.storage.Storage.ImageFormat;
|
||||
import com.cloud.storage.StorageLayer;
|
||||
import com.cloud.storage.VMTemplateHostVO;
|
||||
import com.cloud.storage.VMTemplateStorageResourceAssoc;
|
||||
import com.cloud.storage.template.Processor.FormatInfo;
|
||||
import com.cloud.storage.template.TemplateDownloader.DownloadCompleteCallback;
|
||||
import com.cloud.storage.template.TemplateDownloader.Status;
|
||||
|
|
@ -58,7 +59,6 @@ import com.cloud.utils.component.ComponentLocator.ComponentInfo;
|
|||
import com.cloud.utils.exception.CloudRuntimeException;
|
||||
import com.cloud.utils.script.OutputInterpreter;
|
||||
import com.cloud.utils.script.Script;
|
||||
import com.cloud.storage.VMTemplateStorageResourceAssoc;
|
||||
|
||||
/**
|
||||
* @author chiradeep
|
||||
|
|
@ -180,8 +180,9 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
File f = new File(dnldPath);
|
||||
File dir = f.getParentFile();
|
||||
f.delete();
|
||||
if (dir != null)
|
||||
if (dir != null) {
|
||||
dir.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -194,7 +195,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
public long getTemplatesize() {
|
||||
return templatesize;
|
||||
}
|
||||
|
||||
|
||||
public void setTemplatePhysicalSize(long templatePhysicalSize) {
|
||||
this.templatePhysicalSize = templatePhysicalSize;
|
||||
}
|
||||
|
|
@ -215,7 +216,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
private final Map<String, DownloadJob> jobs = new ConcurrentHashMap<String, DownloadJob>();
|
||||
private String listTmpltScr;
|
||||
private int installTimeoutPerGig = 180 * 60 * 1000;
|
||||
private boolean _sslCopy;
|
||||
private boolean _sslCopy;
|
||||
|
||||
/**
|
||||
* Get notified of change of job status. Executed in context of downloader thread
|
||||
|
|
@ -282,7 +283,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
templatePath = dnld.getInstallPathPrefix() + dnld.getAccountId() + File.separator + dnld.getId() + File.separator;// dnld.getTmpltName();
|
||||
|
||||
_storage.mkdirs(templatePath);
|
||||
|
||||
|
||||
// once template path is set, remove the parent dir so that the template is installed with a relative path
|
||||
String finalTemplatePath = templatePath.substring(parentDir.length());
|
||||
dnld.setTmpltPath(finalTemplatePath);
|
||||
|
|
@ -309,8 +310,8 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
} else {
|
||||
templateName = java.util.UUID.nameUUIDFromBytes((jobs.get(jobId).getTmpltName() + System.currentTimeMillis()).getBytes()).toString();
|
||||
}
|
||||
|
||||
String templateFilename = templateName + "." + extension;
|
||||
|
||||
String templateFilename = templateName + "." + extension;
|
||||
dnld.setTmpltPath(finalTemplatePath + "/" + templateFilename);
|
||||
scr.add("-n", templateFilename);
|
||||
|
||||
|
|
@ -326,11 +327,11 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
// Set permissions for the downloaded template
|
||||
File downloadedTemplate = new File(templatePath + "/" + templateFilename);
|
||||
_storage.setWorldReadableAndWriteable(downloadedTemplate);
|
||||
|
||||
|
||||
// Set permissions for template.properties
|
||||
File templateProperties = new File(templatePath + "/template.properties");
|
||||
_storage.setWorldReadableAndWriteable(templateProperties);
|
||||
|
|
@ -343,18 +344,18 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
loc.purge();
|
||||
return "Unable to download due to " + e.getMessage();
|
||||
}
|
||||
|
||||
|
||||
Enumeration<Processor> en = _processors.enumeration();
|
||||
while (en.hasMoreElements()) {
|
||||
Processor processor = en.nextElement();
|
||||
|
||||
|
||||
FormatInfo info = null;
|
||||
try {
|
||||
info = processor.process(templatePath, null, templateName);
|
||||
} catch (InternalErrorException e) {
|
||||
s_logger.error("Template process exception ", e);
|
||||
return e.toString();
|
||||
}
|
||||
try {
|
||||
info = processor.process(templatePath, null, templateName);
|
||||
} catch (InternalErrorException e) {
|
||||
s_logger.error("Template process exception ", e);
|
||||
return e.toString();
|
||||
}
|
||||
if (info != null) {
|
||||
loc.addFormat(info);
|
||||
dnld.setTemplatesize(info.virtualSize);
|
||||
|
|
@ -362,12 +363,12 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!loc.save()) {
|
||||
s_logger.warn("Cleaning up because we're unable to save the formats");
|
||||
loc.purge();
|
||||
}
|
||||
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -390,19 +391,19 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
String tmpDir = installPathPrefix + File.separator + accountId + File.separator + id;
|
||||
|
||||
try {
|
||||
|
||||
|
||||
if (!_storage.mkdirs(tmpDir)) {
|
||||
s_logger.warn("Unable to create " + tmpDir);
|
||||
return "Unable to create " + tmpDir;
|
||||
}
|
||||
|
||||
File file = _storage.getFile(tmpDir + File.separator + TemplateLocation.Filename);
|
||||
|
||||
|
||||
if (!file.createNewFile()) {
|
||||
s_logger.warn("Unable to create new file: " + file.getAbsolutePath());
|
||||
return "Unable to create new file: " + file.getAbsolutePath();
|
||||
}
|
||||
|
||||
|
||||
URI uri;
|
||||
try {
|
||||
uri = new URI(url);
|
||||
|
|
@ -459,7 +460,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
public long getDownloadTemplatePhysicalSize(String jobId) {
|
||||
DownloadJob dj = jobs.get(jobId);
|
||||
if (dj != null) {
|
||||
|
|
@ -534,10 +535,10 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
String user = null;
|
||||
String password = null;
|
||||
if (cmd.getAuth() != null) {
|
||||
user = cmd.getAuth().getUserName();
|
||||
password = new String(cmd.getAuth().getPassword());
|
||||
user = cmd.getAuth().getUserName();
|
||||
password = new String(cmd.getAuth().getPassword());
|
||||
}
|
||||
|
||||
|
||||
long maxDownloadSizeInBytes = (cmd.getMaxDownloadSizeInBytes() == null) ? TemplateDownloader.DEFAULT_MAX_TEMPLATE_SIZE_IN_BYTES : (cmd.getMaxDownloadSizeInBytes());
|
||||
String jobId = downloadPublicTemplate(cmd.getId(), cmd.getUrl(), cmd.getName(), cmd.getFormat(), cmd.isHvm(), cmd.getAccountId(), cmd.getDescription(), cmd.getChecksum(), installPathPrefix, user, password, maxDownloadSizeInBytes);
|
||||
sleep();
|
||||
|
|
@ -560,8 +561,9 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
String jobId = cmd.getJobId();
|
||||
DownloadAnswer answer;
|
||||
DownloadJob dj = null;
|
||||
if (jobId != null)
|
||||
if (jobId != null) {
|
||||
dj = jobs.get(jobId);
|
||||
}
|
||||
if (dj == null) {
|
||||
if (cmd.getRequest() == RequestType.GET_OR_RESTART) {
|
||||
DownloadCommand dcmd = new DownloadCommand(cmd);
|
||||
|
|
@ -585,14 +587,14 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
break;
|
||||
case PURGE:
|
||||
td.stopDownload();
|
||||
answer = new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId),
|
||||
answer = new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId),
|
||||
getInstallPath(jobId), getDownloadTemplateSize(jobId), getDownloadTemplatePhysicalSize(jobId));
|
||||
jobs.remove(jobId);
|
||||
return answer;
|
||||
default:
|
||||
break; // TODO
|
||||
}
|
||||
return new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId),
|
||||
return new DownloadAnswer(jobId, getDownloadPct(jobId), getDownloadError(jobId), getDownloadStatus2(jobId), getDownloadLocalPath(jobId),
|
||||
getInstallPath(jobId), getDownloadTemplateSize(jobId), getDownloadTemplatePhysicalSize(jobId));
|
||||
}
|
||||
|
||||
|
|
@ -654,7 +656,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
TemplateInfo tInfo = loc.getTemplateInfo();
|
||||
|
||||
result.put(tInfo.templateName, tInfo);
|
||||
|
|
@ -670,7 +672,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
s_logger.debug("Added iso template name: " + tmpltName + ", path: " + tmplt);
|
||||
result.put(tmpltName, tInfo);
|
||||
}
|
||||
*/
|
||||
*/
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -715,6 +717,11 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
public List<String> getPaths() {
|
||||
return paths;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean drain() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public DownloadManagerImpl() {
|
||||
|
|
@ -744,22 +751,22 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
}
|
||||
String useSsl = (String)params.get("sslcopy");
|
||||
if (useSsl != null) {
|
||||
_sslCopy = Boolean.parseBoolean(useSsl);
|
||||
|
||||
_sslCopy = Boolean.parseBoolean(useSsl);
|
||||
|
||||
}
|
||||
configureFolders(name, params);
|
||||
String inSystemVM = (String)params.get("secondary.storage.vm");
|
||||
if (inSystemVM != null && "true".equalsIgnoreCase(inSystemVM)) {
|
||||
s_logger.info("DownloadManager: starting additional services since we are inside system vm");
|
||||
startAdditionalServices();
|
||||
blockOutgoingOnPrivate();
|
||||
s_logger.info("DownloadManager: starting additional services since we are inside system vm");
|
||||
startAdditionalServices();
|
||||
blockOutgoingOnPrivate();
|
||||
}
|
||||
|
||||
value = (String) params.get("install.timeout.pergig");
|
||||
this.installTimeoutPerGig = NumbersUtil.parseInt(value, 15 * 60) * 1000;
|
||||
|
||||
value = (String) params.get("install.numthreads");
|
||||
final int numInstallThreads = NumbersUtil.parseInt(value, 10);
|
||||
final int numInstallThreads = NumbersUtil.parseInt(value, 10);
|
||||
|
||||
String scriptsDir = (String) params.get("template.scripts.dir");
|
||||
if (scriptsDir == null) {
|
||||
|
|
@ -779,15 +786,15 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
s_logger.info("createtmplt.sh found in " + createTmpltScr);
|
||||
|
||||
List<ComponentInfo<Adapter>> processors = new ArrayList<ComponentInfo<Adapter>>();
|
||||
|
||||
|
||||
Processor processor = new VhdProcessor();
|
||||
processor.configure("VHD Processor", params);
|
||||
processors.add(new ComponentInfo<Adapter>("VHD Processor", VhdProcessor.class, processor));
|
||||
|
||||
|
||||
processor = new IsoProcessor();
|
||||
processor.configure("ISO Processor", params);
|
||||
processors.add(new ComponentInfo<Adapter>("ISO Processor", IsoProcessor.class, processor));
|
||||
|
||||
|
||||
processor = new QCOW2Processor();
|
||||
processor.configure("QCOW2 Processor", params);
|
||||
processors.add(new ComponentInfo<Adapter>("QCOW2 Processor", QCOW2Processor.class, processor));
|
||||
|
|
@ -795,7 +802,7 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
processor = new VmdkProcessor();
|
||||
processor.configure("VMDK Processor", params);
|
||||
processors.add(new ComponentInfo<Adapter>("VMDK Processor", VmdkProcessor.class, processor));
|
||||
|
||||
|
||||
_processors = new Adapters<Processor>("processors", processors);
|
||||
// Add more processors here.
|
||||
threadPool = Executors.newFixedThreadPool(numInstallThreads);
|
||||
|
|
@ -803,20 +810,20 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
}
|
||||
|
||||
private void blockOutgoingOnPrivate() {
|
||||
Script command = new Script("/bin/bash", s_logger);
|
||||
String intf = "eth1";
|
||||
command.add("-c");
|
||||
command.add("iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "80" + " -j REJECT;" +
|
||||
"iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j REJECT;");
|
||||
Script command = new Script("/bin/bash", s_logger);
|
||||
String intf = "eth1";
|
||||
command.add("-c");
|
||||
command.add("iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "80" + " -j REJECT;" +
|
||||
"iptables -A OUTPUT -o " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j REJECT;");
|
||||
|
||||
String result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in blocking outgoing to port 80/443 err=" + result );
|
||||
return;
|
||||
}
|
||||
}
|
||||
String result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in blocking outgoing to port 80/443 err=" + result );
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
protected void configureFolders(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
protected void configureFolders(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
parentDir = (String) params.get("template.parent");
|
||||
if (parentDir == null) {
|
||||
throw new ConfigurationException("Unable to find the parent root for the templates");
|
||||
|
|
@ -826,19 +833,19 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
if (value == null) {
|
||||
value = TemplateConstants.DEFAULT_TMPLT_ROOT_DIR;
|
||||
}
|
||||
|
||||
|
||||
if (value.startsWith(File.separator)) {
|
||||
publicTemplateRepo = value;
|
||||
} else {
|
||||
publicTemplateRepo = parentDir + File.separator + value;
|
||||
}
|
||||
|
||||
|
||||
if (!publicTemplateRepo.endsWith(File.separator)) {
|
||||
publicTemplateRepo += File.separator;
|
||||
}
|
||||
|
||||
|
||||
publicTemplateRepo += TemplateConstants.DEFAULT_TMPLT_FIRST_LEVEL_DIR;
|
||||
|
||||
|
||||
if (!_storage.mkdirs(publicTemplateRepo)) {
|
||||
throw new ConfigurationException("Unable to create public templates directory");
|
||||
}
|
||||
|
|
@ -858,54 +865,54 @@ public class DownloadManagerImpl implements DownloadManager {
|
|||
public boolean stop() {
|
||||
return true;
|
||||
}
|
||||
|
||||
private void startAdditionalServices() {
|
||||
|
||||
Script command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("if [ -d /etc/apache2 ] ; then service apache2 stop; else service httpd stop; fi ");
|
||||
String result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in stopping httpd service err=" + result );
|
||||
}
|
||||
String port = Integer.toString(TemplateConstants.DEFAULT_TMPLT_COPY_PORT);
|
||||
String intf = TemplateConstants.DEFAULT_TMPLT_COPY_INTF;
|
||||
|
||||
command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + port + " -j ACCEPT;" +
|
||||
"iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j ACCEPT;");
|
||||
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in opening up httpd port err=" + result );
|
||||
return;
|
||||
}
|
||||
|
||||
command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("if [ -d /etc/apache2 ] ; then service apache2 start; else service httpd start; fi ");
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in starting httpd service err=" + result );
|
||||
return;
|
||||
}
|
||||
command = new Script("mkdir", s_logger);
|
||||
command.add("-p");
|
||||
command.add("/var/www/html/copy/template");
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in creating directory =" + result );
|
||||
return;
|
||||
}
|
||||
|
||||
command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("ln -sf " + publicTemplateRepo + " /var/www/html/copy/template");
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in linking err=" + result );
|
||||
return;
|
||||
}
|
||||
}
|
||||
private void startAdditionalServices() {
|
||||
|
||||
Script command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("if [ -d /etc/apache2 ] ; then service apache2 stop; else service httpd stop; fi ");
|
||||
String result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in stopping httpd service err=" + result );
|
||||
}
|
||||
String port = Integer.toString(TemplateConstants.DEFAULT_TMPLT_COPY_PORT);
|
||||
String intf = TemplateConstants.DEFAULT_TMPLT_COPY_INTF;
|
||||
|
||||
command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + port + " -j ACCEPT;" +
|
||||
"iptables -I INPUT -i " + intf + " -p tcp -m state --state NEW -m tcp --dport " + "443" + " -j ACCEPT;");
|
||||
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in opening up httpd port err=" + result );
|
||||
return;
|
||||
}
|
||||
|
||||
command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("if [ -d /etc/apache2 ] ; then service apache2 start; else service httpd start; fi ");
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in starting httpd service err=" + result );
|
||||
return;
|
||||
}
|
||||
command = new Script("mkdir", s_logger);
|
||||
command.add("-p");
|
||||
command.add("/var/www/html/copy/template");
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in creating directory =" + result );
|
||||
return;
|
||||
}
|
||||
|
||||
command = new Script("/bin/bash", s_logger);
|
||||
command.add("-c");
|
||||
command.add("ln -sf " + publicTemplateRepo + " /var/www/html/copy/template");
|
||||
result = command.execute();
|
||||
if (result != null) {
|
||||
s_logger.warn("Error in linking err=" + result );
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,138 +19,139 @@ import com.cloud.utils.nio.Link;
|
|||
|
||||
|
||||
public class ClusteredAgentAttache extends ConnectedAgentAttache implements Routable {
|
||||
private final static Logger s_logger = Logger.getLogger(ClusteredAgentAttache.class);
|
||||
private static ClusteredAgentManagerImpl s_clusteredAgentMgr;
|
||||
protected ByteBuffer _buffer = ByteBuffer.allocate(2048);
|
||||
private boolean _forward = false;
|
||||
private final static Logger s_logger = Logger.getLogger(ClusteredAgentAttache.class);
|
||||
private static ClusteredAgentManagerImpl s_clusteredAgentMgr;
|
||||
protected ByteBuffer _buffer = ByteBuffer.allocate(2048);
|
||||
private boolean _forward = false;
|
||||
|
||||
static public void initialize(ClusteredAgentManagerImpl agentMgr) {
|
||||
s_clusteredAgentMgr = agentMgr;
|
||||
}
|
||||
static public void initialize(ClusteredAgentManagerImpl agentMgr) {
|
||||
s_clusteredAgentMgr = agentMgr;
|
||||
}
|
||||
|
||||
public ClusteredAgentAttache(long id) {
|
||||
super(id, null, false);
|
||||
_forward = true;
|
||||
}
|
||||
|
||||
public ClusteredAgentAttache(long id, Link link, boolean maintenance) {
|
||||
super(id, link, maintenance);
|
||||
_forward = link == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return _forward ? false : super.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean forForward() {
|
||||
return _forward;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusteredAgentAttache(long id) {
|
||||
super(id, null, false);
|
||||
_forward = true;
|
||||
}
|
||||
|
||||
public ClusteredAgentAttache(long id, Link link, boolean maintenance) {
|
||||
super(id, link, maintenance);
|
||||
_forward = link == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return _forward ? false : super.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean forForward() {
|
||||
return _forward;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(long seq) {
|
||||
if (forForward()) {
|
||||
Listener listener = getListener(seq);
|
||||
if (listener != null && listener instanceof SynchronousListener) {
|
||||
SynchronousListener synchronous = (SynchronousListener)listener;
|
||||
String peerName = synchronous.getPeer();
|
||||
if (peerName != null) {
|
||||
s_logger.debug(log(seq, "Forwarding to peer to cancel due to timeout"));
|
||||
s_clusteredAgentMgr.cancel(peerName, _id, seq, "Timed Out");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
super.cancel(seq);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void routeToAgent(byte[] data) throws AgentUnavailableException {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Routing from " + Request.getManagementServerId(data)));
|
||||
}
|
||||
|
||||
if (_link == null) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Link is closed"));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
_link.send(data);
|
||||
} catch (ClosedChannelException e) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Channel is closed"));
|
||||
}
|
||||
|
||||
throw new AgentUnavailableException("Channel to agent is closed", _id);
|
||||
} catch (NullPointerException e) {
|
||||
if (forForward()) {
|
||||
Listener listener = getListener(seq);
|
||||
if (listener != null && listener instanceof SynchronousListener) {
|
||||
SynchronousListener synchronous = (SynchronousListener)listener;
|
||||
String peerName = synchronous.getPeer();
|
||||
if (peerName != null) {
|
||||
s_logger.debug(log(seq, "Forwarding to peer to cancel due to timeout"));
|
||||
s_clusteredAgentMgr.cancel(peerName, _id, seq, "Timed Out");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
super.cancel(seq);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void routeToAgent(byte[] data) throws AgentUnavailableException {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Routing from " + Request.getManagementServerId(data)));
|
||||
}
|
||||
|
||||
if (_link == null) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Link is closed"));
|
||||
}
|
||||
// Note: since this block is not in synchronized. It is possible for _link to become null.
|
||||
throw new AgentUnavailableException("Link to agent is closed", _id);
|
||||
}
|
||||
|
||||
try {
|
||||
_link.send(data);
|
||||
} catch (ClosedChannelException e) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Channel is closed"));
|
||||
}
|
||||
|
||||
throw new AgentUnavailableException("Channel to agent is closed", _id);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request req, Listener listener) throws AgentUnavailableException {
|
||||
if (_link != null) {
|
||||
super.send(req, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
long seq = req.getSequence();
|
||||
|
||||
if (listener != null) {
|
||||
registerListener(req.getSequence(), listener);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
SocketChannel ch = null;
|
||||
boolean error = true;
|
||||
try {
|
||||
while (i++ < 5) {
|
||||
String peerName = s_clusteredAgentMgr.findPeer(_id);
|
||||
if (peerName == null) {
|
||||
throw new AgentUnavailableException("Unable to find peer", _id);
|
||||
}
|
||||
|
||||
ch = s_clusteredAgentMgr.connectToPeer(peerName, ch);
|
||||
if (ch == null) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Unable to forward " + req.toString()));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Forwarding " + req.toString() + " to " + peerName));
|
||||
}
|
||||
if (req.executeInSequence() && listener != null && listener instanceof SynchronousListener) {
|
||||
SynchronousListener synchronous = (SynchronousListener)listener;
|
||||
synchronous.setPeer(peerName);
|
||||
}
|
||||
Link.write(ch, req.toBytes());
|
||||
error = false;
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Error on connecting to management node: " + req.toString() + " try = " + i));
|
||||
}
|
||||
|
||||
if(s_logger.isInfoEnabled()) {
|
||||
} catch (NullPointerException e) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(Request.getSequence(data), "Link is closed"));
|
||||
}
|
||||
// Note: since this block is not in synchronized. It is possible for _link to become null.
|
||||
throw new AgentUnavailableException("Unable to send", _id);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request req, Listener listener) throws AgentUnavailableException {
|
||||
if (_link != null) {
|
||||
super.send(req, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
long seq = req.getSequence();
|
||||
|
||||
if (listener != null) {
|
||||
registerListener(req.getSequence(), listener);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
SocketChannel ch = null;
|
||||
boolean error = true;
|
||||
try {
|
||||
while (i++ < 5) {
|
||||
String peerName = s_clusteredAgentMgr.findPeer(_id);
|
||||
if (peerName == null) {
|
||||
throw new AgentUnavailableException("Unable to find peer", _id);
|
||||
}
|
||||
|
||||
ch = s_clusteredAgentMgr.connectToPeer(peerName, ch);
|
||||
if (ch == null) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Unable to forward " + req.toString()));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Forwarding " + req.toString() + " to " + peerName));
|
||||
}
|
||||
if (req.executeInSequence() && listener != null && listener instanceof SynchronousListener) {
|
||||
SynchronousListener synchronous = (SynchronousListener)listener;
|
||||
synchronous.setPeer(peerName);
|
||||
}
|
||||
Link.write(ch, req.toBytes());
|
||||
error = false;
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug(log(seq, "Error on connecting to management node: " + req.toString() + " try = " + i));
|
||||
}
|
||||
|
||||
if(s_logger.isInfoEnabled()) {
|
||||
s_logger.info("IOException " + e.getMessage() + " when sending data to peer " + peerName + ", close peer connection and let it re-open");
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (error) {
|
||||
unregisterListener(seq);
|
||||
}
|
||||
}
|
||||
throw new AgentUnavailableException("Unable to reach the peer that the agent is connected", _id);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (error) {
|
||||
unregisterListener(seq);
|
||||
}
|
||||
}
|
||||
throw new AgentUnavailableException("Unable to reach the peer that the agent is connected", _id);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue