diff --git a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
index c0b04b89aeb..a3932e47dd8 100644
--- a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
+++ b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
@@ -18,6 +18,37 @@
*/
package com.cloud.hypervisor.xen.resource;
+import static com.cloud.utils.ReflectUtil.flattenProperties;
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cloudstack.storage.command.AttachAnswer;
+import org.apache.cloudstack.storage.command.AttachCommand;
+import org.apache.cloudstack.storage.command.AttachPrimaryDataStoreAnswer;
+import org.apache.cloudstack.storage.command.AttachPrimaryDataStoreCmd;
+import org.apache.cloudstack.storage.command.CopyCmdAnswer;
+import org.apache.cloudstack.storage.command.CopyCommand;
+import org.apache.cloudstack.storage.command.CreateObjectAnswer;
+import org.apache.cloudstack.storage.command.CreateObjectCommand;
+import org.apache.cloudstack.storage.command.DeleteCommand;
+import org.apache.cloudstack.storage.command.DettachAnswer;
+import org.apache.cloudstack.storage.command.DettachCommand;
+import org.apache.cloudstack.storage.datastore.protocol.DataStoreProtocol;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
+import org.apache.cloudstack.storage.to.SnapshotObjectTO;
+import org.apache.cloudstack.storage.to.TemplateObjectTO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.log4j.Logger;
+import org.apache.xmlrpc.XmlRpcException;
+
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CreateStoragePoolCommand;
import com.cloud.agent.api.to.DataObjectType;
@@ -51,36 +82,6 @@ import com.xensource.xenapi.VBD;
import com.xensource.xenapi.VDI;
import com.xensource.xenapi.VM;
import com.xensource.xenapi.VMGuestMetrics;
-import org.apache.cloudstack.storage.command.AttachAnswer;
-import org.apache.cloudstack.storage.command.AttachCommand;
-import org.apache.cloudstack.storage.command.AttachPrimaryDataStoreAnswer;
-import org.apache.cloudstack.storage.command.AttachPrimaryDataStoreCmd;
-import org.apache.cloudstack.storage.command.CopyCmdAnswer;
-import org.apache.cloudstack.storage.command.CopyCommand;
-import org.apache.cloudstack.storage.command.CreateObjectAnswer;
-import org.apache.cloudstack.storage.command.CreateObjectCommand;
-import org.apache.cloudstack.storage.command.DeleteCommand;
-import org.apache.cloudstack.storage.command.DettachAnswer;
-import org.apache.cloudstack.storage.command.DettachCommand;
-import org.apache.cloudstack.storage.datastore.protocol.DataStoreProtocol;
-import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
-import org.apache.cloudstack.storage.to.SnapshotObjectTO;
-import org.apache.cloudstack.storage.to.TemplateObjectTO;
-import org.apache.cloudstack.storage.to.VolumeObjectTO;
-import org.apache.log4j.Logger;
-import org.apache.xmlrpc.XmlRpcException;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import static com.cloud.utils.ReflectUtil.flattenProperties;
-import static com.google.common.collect.Lists.newArrayList;
public class XenServerStorageProcessor implements StorageProcessor {
private static final Logger s_logger = Logger.getLogger(XenServerStorageProcessor.class);
@@ -1080,11 +1081,12 @@ public class XenServerStorageProcessor implements StorageProcessor {
S3Utils.ClientOptions.class));
// https workaround for Introspector bug that does not
// recognize Boolean accessor methods ...
+
parameters.addAll(Arrays.asList("operation", "put", "filename",
dir + "/" + filename, "iSCSIFlag",
iSCSIFlag.toString(), "bucket", s3.getBucketName(),
"key", key, "https", s3.isHttps() != null ? s3.isHttps().toString()
- : "null"));
+ : "null", "maxSingleUploadSizeInBytes", String.valueOf(s3.getMaxSingleUploadSizeInBytes())));
final String result = hypervisorResource.callHostPluginAsync(connection, "s3xen",
"s3", wait,
parameters.toArray(new String[parameters.size()]));
diff --git a/scripts/vm/hypervisor/xenserver/s3xen b/scripts/vm/hypervisor/xenserver/s3xen
index 372a6daaddc..bf81bbd34a6 100644
--- a/scripts/vm/hypervisor/xenserver/s3xen
+++ b/scripts/vm/hypervisor/xenserver/s3xen
@@ -34,6 +34,7 @@ import base64
import hmac
import traceback
import urllib2
+from xml.dom.minidom import parseString
import XenAPIPlugin
sys.path.extend(["/opt/xensource/sm/"])
@@ -260,15 +261,73 @@ class S3Client(object):
sha).digest())[:-1]
return signature, request_date
+
+ def getText(self, nodelist):
+ rc = []
+ for node in nodelist:
+ if node.nodeType == node.TEXT_NODE:
+ rc.append(node.data)
+ return ''.join(rc)
- def put(self, bucket, key, src_filename):
+ def multiUpload(self, bucket, key, src_fileName, chunkSize=5 * 1024 * 1024):
+ uploadId={}
+ def readInitalMultipart(response):
+ data = response.read()
+ xmlResult = parseString(data)
+ result = xmlResult.getElementsByTagName("InitiateMultipartUploadResult")[0]
+ upload = result.getElementsByTagName("UploadId")[0]
+ uploadId["0"] = upload.childNodes[0].data
+
+ self.do_operation('POST', bucket, key + "?uploads", fn_read=readInitalMultipart)
+
+ fileSize = os.path.getsize(src_fileName)
+ parts = fileSize / chunkSize + ((fileSize % chunkSize) and 1)
+ part = 1
+ srcFile = open(src_fileName, 'rb')
+ etags = []
+ while part <= parts:
+ offset = part - 1
+ size = min(fileSize - offset * chunkSize, chunkSize)
+ headers = {
+ self.HEADER_CONTENT_LENGTH: size
+ }
+ def send_body(connection):
+ srcFile.seek(offset * chunkSize)
+ block = srcFile.read(size)
+ connection.send(block)
+ def read_multiPart(response):
+ etag = response.getheader('ETag')
+ etags.append((part, etag))
+ self.do_operation("PUT", bucket, "%s?partNumber=%s&uploadId=%s"%(key, part, uploadId["0"]), headers, send_body, read_multiPart)
+ part = part + 1
+ srcFile.close()
+
+ data = []
+ partXml = "%i%s"
+ for etag in etags:
+ data.append(partXml%etag)
+ msg = "%s"%("".join(data))
+ size = len(msg)
+ headers = {
+ self.HEADER_CONTENT_LENGTH: size
+ }
+ def send_complete_multipart(connection):
+ connection.send(msg)
+ self.do_operation("POST", bucket, "%s?uploadId=%s"%(key, uploadId["0"]), headers, send_complete_multipart)
+
+ def put(self, bucket, key, src_filename, maxSingleUpload):
if not os.path.isfile(src_filename):
raise Exception(
"Attempt to put " + src_filename + " that does not exist.")
+ size = os.path.getsize(src_filename)
+ if size > maxSingleUpload or maxSingleUpload == 0:
+ return self.multiUpload(bucket, key, src_filename)
+
headers = {
self.HEADER_CONTENT_MD5: compute_md5(src_filename),
+
self.HEADER_CONTENT_TYPE: 'application/octet-stream',
self.HEADER_CONTENT_LENGTH: str(os.stat(src_filename).st_size),
}
@@ -323,6 +382,7 @@ def parseArguments(args):
bucket = args['bucket']
key = args['key']
filename = args['filename']
+ maxSingleUploadBytes = int(args["maxSingleUploadSizeInBytes"])
if is_blank(operation):
raise ValueError('An operation must be specified.')
@@ -336,18 +396,18 @@ def parseArguments(args):
if is_blank(filename):
raise ValueError('A filename must be specified.')
- return client, operation, bucket, key, filename
+ return client, operation, bucket, key, filename, maxSingleUploadBytes
@echo
def s3(session, args):
- client, operation, bucket, key, filename = parseArguments(args)
+ client, operation, bucket, key, filename, maxSingleUploadBytes = parseArguments(args)
try:
if operation == 'put':
- client.put(bucket, key, filename)
+ client.put(bucket, key, filename, maxSingleUploadBytes)
elif operation == 'get':
client.get(bucket, key, filename)
elif operation == 'delete':