From f1895ea39ae590ed7ff423649420659650b9853f Mon Sep 17 00:00:00 2001 From: Min Chen Date: Fri, 4 Oct 2013 18:21:01 -0700 Subject: [PATCH] CLOUDSTACK-4816: Make S3 upload multipart or singlepart configurable. --- api/src/com/cloud/agent/api/to/S3TO.java | 14 +++- .../template/S3TemplateDownloader.java | 23 ++--- .../driver/S3ImageStoreDriverImpl.java | 5 +- .../src/com/cloud/configuration/Config.java | 1 + .../resource/NfsSecondaryStorageResource.java | 84 ++++++++++--------- setup/db/db/schema-420to430.sql | 2 + utils/src/com/cloud/utils/S3Utils.java | 56 +++++++++++++ 7 files changed, 132 insertions(+), 53 deletions(-) diff --git a/api/src/com/cloud/agent/api/to/S3TO.java b/api/src/com/cloud/agent/api/to/S3TO.java index b1b692a8bad..ab08a696c96 100644 --- a/api/src/com/cloud/agent/api/to/S3TO.java +++ b/api/src/com/cloud/agent/api/to/S3TO.java @@ -39,6 +39,7 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO { private Integer socketTimeout; private Date created; private boolean enableRRS; + private boolean multipartEnabled; public S3TO() { @@ -50,7 +51,7 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO { final String secretKey, final String endPoint, final String bucketName, final Boolean httpsFlag, final Integer connectionTimeout, final Integer maxErrorRetry, - final Integer socketTimeout, final Date created, final boolean enableRRS) { + final Integer socketTimeout, final Date created, final boolean enableRRS, final boolean multipart) { super(); @@ -66,6 +67,7 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO { this.socketTimeout = socketTimeout; this.created = created; this.enableRRS = enableRRS; + this.multipartEnabled = multipart; } @@ -268,7 +270,6 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO { } - public boolean getEnableRRS() { return enableRRS; } @@ -277,5 +278,14 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO { this.enableRRS = enableRRS; } + public boolean isMultipartEnabled() { + return multipartEnabled; + } + + public void setMultipartEnabled(boolean multipartEnabled) { + this.multipartEnabled = multipartEnabled; + } + + } diff --git a/core/src/com/cloud/storage/template/S3TemplateDownloader.java b/core/src/com/cloud/storage/template/S3TemplateDownloader.java index dd595ea3c97..462b21b700b 100644 --- a/core/src/com/cloud/storage/template/S3TemplateDownloader.java +++ b/core/src/com/cloud/storage/template/S3TemplateDownloader.java @@ -47,8 +47,6 @@ import com.amazonaws.services.s3.model.ProgressEvent; import com.amazonaws.services.s3.model.ProgressListener; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.StorageClass; -import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.Upload; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType; @@ -227,9 +225,6 @@ public class S3TemplateDownloader extends ManagedContextRunnable implements Temp // compute s3 key s3Key = join(asList(installPath, fileName), S3Utils.SEPARATOR); - // multi-part upload using S3 api to handle > 5G input stream - TransferManager tm = new TransferManager(S3Utils.acquireClient(s3)); - // download using S3 API ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(remoteSize); @@ -262,11 +257,19 @@ public class S3TemplateDownloader extends ManagedContextRunnable implements Temp } }); - // TransferManager processes all transfers asynchronously, - // so this call will return immediately. - Upload upload = tm.upload(putObjectRequest); - - upload.waitForCompletion(); + + if ( s3.isMultipartEnabled()){ + // use TransferManager to do multipart upload + S3Utils.mputObject(s3, putObjectRequest); + } else{ + // single part upload, with 5GB limit in Amazon + S3Utils.putObject(s3, putObjectRequest); + while (status != TemplateDownloader.Status.DOWNLOAD_FINISHED && + status != TemplateDownloader.Status.UNRECOVERABLE_ERROR && + status != TemplateDownloader.Status.ABORTED) { + // wait for completion + } + } // finished or aborted Date finish = new Date(); diff --git a/plugins/storage/image/s3/src/org/apache/cloudstack/storage/datastore/driver/S3ImageStoreDriverImpl.java b/plugins/storage/image/s3/src/org/apache/cloudstack/storage/datastore/driver/S3ImageStoreDriverImpl.java index 7ca482422e3..f31aea3bcb1 100644 --- a/plugins/storage/image/s3/src/org/apache/cloudstack/storage/datastore/driver/S3ImageStoreDriverImpl.java +++ b/plugins/storage/image/s3/src/org/apache/cloudstack/storage/datastore/driver/S3ImageStoreDriverImpl.java @@ -66,7 +66,10 @@ public class S3ImageStoreDriverImpl extends BaseImageStoreDriverImpl { details.get(ApiConstants.S3_SOCKET_TIMEOUT) == null ? null : Integer.valueOf(details .get(ApiConstants.S3_SOCKET_TIMEOUT)), imgStore.getCreated(), _configDao.getValue(Config.S3EnableRRS.toString()) == null ? false : Boolean.parseBoolean(_configDao - .getValue(Config.S3EnableRRS.toString()))); + .getValue(Config.S3EnableRRS.toString())), + _configDao.getValue(Config.S3EnableMultiPartUpload.toString()) == null ? true : Boolean.parseBoolean(_configDao + .getValue(Config.S3EnableMultiPartUpload.toString())) + ); } diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index 8ca595b14d0..1377bf72161 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -376,6 +376,7 @@ public enum Config { // object store S3EnableRRS("Advanced", ManagementServer.class, Boolean.class, "s3.rrs.enabled", "false", "enable s3 reduced redundancy storage", null), + S3EnableMultiPartUpload("Advanced", ManagementServer.class, Boolean.class, "s3.multipart.enabled", "true", "enable s3 multipart upload", null), // Ldap LdapBasedn("Advanced", ManagementServer.class, String.class, "ldap.basedn", null, "Sets the basedn for LDAP", null), diff --git a/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java b/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java index e26f02d89f4..85d25f9a860 100755 --- a/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java +++ b/services/secondary-storage/src/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java @@ -16,6 +16,15 @@ // under the License. package org.apache.cloudstack.storage.resource; + +import static com.cloud.utils.S3Utils.mputFile; +import static com.cloud.utils.S3Utils.putFile; +import static com.cloud.utils.StringUtils.join; +import static com.cloud.utils.db.GlobalLock.executeWithNoWaitLock; +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static org.apache.commons.lang.StringUtils.substringAfterLast; + import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -40,21 +49,6 @@ import java.util.concurrent.Callable; import javax.naming.ConfigurationException; -import org.apache.cloudstack.storage.command.CopyCmdAnswer; -import org.apache.cloudstack.storage.command.CopyCommand; -import org.apache.cloudstack.storage.command.DeleteCommand; -import org.apache.cloudstack.storage.command.DownloadCommand; -import org.apache.cloudstack.storage.command.DownloadProgressCommand; -import org.apache.cloudstack.storage.template.DownloadManager; -import org.apache.cloudstack.storage.template.DownloadManagerImpl; -import org.apache.cloudstack.storage.template.DownloadManagerImpl.ZfsPathParser; -import org.apache.cloudstack.storage.template.UploadManager; -import org.apache.cloudstack.storage.template.UploadManagerImpl; -import org.apache.cloudstack.storage.to.ImageStoreTO; -import org.apache.cloudstack.storage.to.PrimaryDataStoreTO; -import org.apache.cloudstack.storage.to.SnapshotObjectTO; -import org.apache.cloudstack.storage.to.TemplateObjectTO; -import org.apache.cloudstack.storage.to.VolumeObjectTO; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpEntity; @@ -67,6 +61,21 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.apache.log4j.Logger; import com.amazonaws.services.s3.model.S3ObjectSummary; + +import org.apache.cloudstack.storage.command.CopyCmdAnswer; +import org.apache.cloudstack.storage.command.CopyCommand; +import org.apache.cloudstack.storage.command.DeleteCommand; +import org.apache.cloudstack.storage.command.DownloadCommand; +import org.apache.cloudstack.storage.command.DownloadProgressCommand; +import org.apache.cloudstack.storage.template.DownloadManager; +import org.apache.cloudstack.storage.template.DownloadManagerImpl; +import org.apache.cloudstack.storage.template.DownloadManagerImpl.ZfsPathParser; +import org.apache.cloudstack.storage.template.UploadManager; +import org.apache.cloudstack.storage.template.UploadManagerImpl; +import org.apache.cloudstack.storage.to.SnapshotObjectTO; +import org.apache.cloudstack.storage.to.TemplateObjectTO; +import org.apache.cloudstack.storage.to.VolumeObjectTO; + import com.cloud.agent.api.Answer; import com.cloud.agent.api.CheckHealthAnswer; import com.cloud.agent.api.CheckHealthCommand; @@ -108,7 +117,6 @@ import com.cloud.hypervisor.Hypervisor.HypervisorType; import com.cloud.resource.ServerResourceBase; import com.cloud.storage.DataStoreRole; import com.cloud.storage.Storage.ImageFormat; -import com.cloud.storage.Storage.StoragePoolType; import com.cloud.storage.StorageLayer; import com.cloud.storage.VMTemplateStorageResourceAssoc; import com.cloud.storage.template.Processor; @@ -128,14 +136,6 @@ import com.cloud.utils.net.NetUtils; import com.cloud.utils.script.OutputInterpreter; import com.cloud.utils.script.Script; import com.cloud.vm.SecondaryStorageVm; -import com.google.common.io.Files; - -import static com.cloud.utils.S3Utils.putFile; -import static com.cloud.utils.StringUtils.join; -import static com.cloud.utils.db.GlobalLock.executeWithNoWaitLock; -import static java.lang.String.format; -import static java.util.Arrays.asList; -import static org.apache.commons.lang.StringUtils.substringAfterLast; public class NfsSecondaryStorageResource extends ServerResourceBase implements SecondaryStorageResource { @@ -197,7 +197,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S } public void setInSystemVM(boolean inSystemVM) { - this._inSystemVM = inSystemVM; + _inSystemVM = inSystemVM; } @Override @@ -297,7 +297,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S String finalFileName = templateFilename; String finalDownloadPath = destPath + File.separator + templateFilename; // compute the size of - long size = this._storage.getSize(downloadPath + File.separator + templateFilename); + long size = _storage.getSize(downloadPath + File.separator + templateFilename); DataTO newDestTO = null; @@ -374,7 +374,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S protected Answer copySnapshotToTemplateFromNfsToNfsXenserver(CopyCommand cmd, SnapshotObjectTO srcData, NfsTO srcDataStore, TemplateObjectTO destData, NfsTO destDataStore) { - String srcMountPoint = this.getRootDir(srcDataStore.getUrl()); + String srcMountPoint = getRootDir(srcDataStore.getUrl()); String snapshotPath = srcData.getPath(); int index = snapshotPath.lastIndexOf("/"); String snapshotName = snapshotPath.substring(index + 1); @@ -384,16 +384,16 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S snapshotPath = snapshotPath.substring(0, index); snapshotPath = srcMountPoint + File.separator + snapshotPath; - String destMountPoint = this.getRootDir(destDataStore.getUrl()); + String destMountPoint = getRootDir(destDataStore.getUrl()); String destPath = destMountPoint + File.separator + destData.getPath(); String errMsg = null; try { - this._storage.mkdir(destPath); + _storage.mkdir(destPath); String templateUuid = UUID.randomUUID().toString(); String templateName = templateUuid + ".vhd"; - Script command = new Script(this.createTemplateFromSnapshotXenScript, cmd.getWait() * 1000, s_logger); + Script command = new Script(createTemplateFromSnapshotXenScript, cmd.getWait() * 1000, s_logger); command.add("-p", snapshotPath); command.add("-s", snapshotName); command.add("-n", templateName); @@ -468,7 +468,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S bufferWriter.write("\n"); bufferWriter.write("filename=" + fileName); bufferWriter.write("\n"); - long size = this._storage.getSize(destFileFullPath); + long size = _storage.getSize(destFileFullPath); bufferWriter.write("size=" + size); bufferWriter.close(); writer.close(); @@ -630,7 +630,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S NfsTO destImageStore = (NfsTO) destDataStore; if (srcDataStore instanceof S3TO) { S3TO s3 = (S3TO) srcDataStore; - return this.copyFromS3ToNfs(cmd, srcData, s3, destData, destImageStore); + return copyFromS3ToNfs(cmd, srcData, s3, destData, destImageStore); } else if (srcDataStore instanceof SwiftTO) { return copyFromSwiftToNfs(cmd, srcData, (SwiftTO)srcDataStore, destData, destImageStore); } @@ -857,9 +857,13 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S } } } - ImageFormat format = this.getTemplateFormat(srcFile.getName()); + ImageFormat format = getTemplateFormat(srcFile.getName()); String key = destData.getPath() + S3Utils.SEPARATOR + srcFile.getName(); - putFile(s3, srcFile, bucket, key); + if (s3.isMultipartEnabled()){ + mputFile(s3, srcFile, bucket, key); + } else{ + putFile(s3, srcFile, bucket, key); + } DataTO retObj = null; if (destData.getObjectType() == DataObjectType.TEMPLATE) { @@ -1271,9 +1275,9 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S int index = name.lastIndexOf(File.separator); String snapshotPath = name.substring(0, index); if (deleteAllFlag) { - lPath = this.getRootDir(secondaryStorageUrl) + File.separator + snapshotPath + File.separator + "*"; + lPath = getRootDir(secondaryStorageUrl) + File.separator + snapshotPath + File.separator + "*"; } else { - lPath = this.getRootDir(secondaryStorageUrl) + File.separator + name + "*"; + lPath = getRootDir(secondaryStorageUrl) + File.separator + name + "*"; } final String result = deleteLocalFile(lPath); @@ -1461,7 +1465,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S Map s3ListTemplate(S3TO s3) { String bucket = s3.getBucketName(); // List the objects in the source directory on S3 - final List objectSummaries = S3Utils.getDirectory(s3, bucket, this.TEMPLATE_ROOT_DIR); + final List objectSummaries = S3Utils.getDirectory(s3, bucket, TEMPLATE_ROOT_DIR); if (objectSummaries == null) { return null; } @@ -1470,7 +1474,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S String key = objectSummary.getKey(); // String installPath = StringUtils.substringBeforeLast(key, // S3Utils.SEPARATOR); - String uniqueName = this.determineS3TemplateNameFromKey(key); + String uniqueName = determineS3TemplateNameFromKey(key); // TODO: isPublic value, where to get? TemplateProp tInfo = new TemplateProp(uniqueName, key, objectSummary.getSize(), objectSummary.getSize(), true, false); @@ -1483,7 +1487,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S Map s3ListVolume(S3TO s3) { String bucket = s3.getBucketName(); // List the objects in the source directory on S3 - final List objectSummaries = S3Utils.getDirectory(s3, bucket, this.VOLUME_ROOT_DIR); + final List objectSummaries = S3Utils.getDirectory(s3, bucket, VOLUME_ROOT_DIR); if (objectSummaries == null) { return null; } @@ -1492,7 +1496,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S String key = objectSummary.getKey(); // String installPath = StringUtils.substringBeforeLast(key, // S3Utils.SEPARATOR); - Long id = this.determineS3VolumeIdFromKey(key); + Long id = determineS3VolumeIdFromKey(key); // TODO: how to get volume template name TemplateProp tInfo = new TemplateProp(id.toString(), key, objectSummary.getSize(), objectSummary.getSize(), true, false); diff --git a/setup/db/db/schema-420to430.sql b/setup/db/db/schema-420to430.sql index 6d8cfe21dad..653ff77c090 100644 --- a/setup/db/db/schema-420to430.sql +++ b/setup/db/db/schema-420to430.sql @@ -389,4 +389,6 @@ CREATE VIEW `cloud`.`volume_view` AS `cloud`.`async_job` ON async_job.instance_id = volumes.id and async_job.instance_type = 'Volume' and async_job.job_status = 0; + +INSERT IGNORE INTO `cloud`.`configuration`(category, instance, component, name, value, description, default_value) VALUES ('Advanced', 'DEFAULT', 'management-server', 's3.multipart.enabled', 'true', 'enable s3 multipart upload', 'true'); diff --git a/utils/src/com/cloud/utils/S3Utils.java b/utils/src/com/cloud/utils/S3Utils.java index 5ee578304d5..ce4d4b723e5 100644 --- a/utils/src/com/cloud/utils/S3Utils.java +++ b/utils/src/com/cloud/utils/S3Utils.java @@ -48,6 +48,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.log4j.Logger; import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; import com.amazonaws.HttpMethod; import com.amazonaws.auth.AWSCredentials; @@ -61,6 +62,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; + import com.cloud.utils.exception.CloudRuntimeException; public final class S3Utils { @@ -171,6 +175,58 @@ public final class S3Utils { } + // multi-part upload file + public static void mputFile(final ClientOptions clientOptions, + final File sourceFile, final String bucketName, final String key) throws InterruptedException { + + assert clientOptions != null; + assert sourceFile != null; + assert !isBlank(bucketName); + assert !isBlank(key); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(format("Multipart sending file %1$s as S3 object %2$s in " + + "bucket %3$s", sourceFile.getName(), key, bucketName)); + } + TransferManager tm = new TransferManager(S3Utils.acquireClient(clientOptions)); + Upload upload = tm.upload(bucketName, key, sourceFile); + upload.waitForCompletion(); + } + + // multi-part upload object + public static void mputObject(final ClientOptions clientOptions, + final InputStream sourceStream, final String bucketName, final String key) throws InterruptedException { + + assert clientOptions != null; + assert sourceStream != null; + assert !isBlank(bucketName); + assert !isBlank(key); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(format("Multipart sending stream as S3 object %1$s in " + + "bucket %2$s", key, bucketName)); + } + TransferManager tm = new TransferManager(S3Utils.acquireClient(clientOptions)); + Upload upload = tm.upload(bucketName, key, sourceStream, null); + upload.waitForCompletion(); + } + + // multi-part upload object + public static void mputObject(final ClientOptions clientOptions, + final PutObjectRequest req) throws InterruptedException { + + assert clientOptions != null; + assert req != null; + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Multipart sending object to S3 using PutObjectRequest"); + } + TransferManager tm = new TransferManager(S3Utils.acquireClient(clientOptions)); + Upload upload = tm.upload(req); + upload.waitForCompletion(); + + } + public static void setObjectAcl(final ClientOptions clientOptions, final String bucketName, final String key, final CannedAccessControlList acl) {