CLOUDSTACK-4816: Make S3 upload multipart or singlepart configurable.

This commit is contained in:
Min Chen 2013-10-04 18:21:01 -07:00
parent a6852a340d
commit f1895ea39a
7 changed files with 132 additions and 53 deletions

View File

@ -39,6 +39,7 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO {
private Integer socketTimeout;
private Date created;
private boolean enableRRS;
private boolean multipartEnabled;
public S3TO() {
@ -50,7 +51,7 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO {
final String secretKey, final String endPoint,
final String bucketName, final Boolean httpsFlag,
final Integer connectionTimeout, final Integer maxErrorRetry,
final Integer socketTimeout, final Date created, final boolean enableRRS) {
final Integer socketTimeout, final Date created, final boolean enableRRS, final boolean multipart) {
super();
@ -66,6 +67,7 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO {
this.socketTimeout = socketTimeout;
this.created = created;
this.enableRRS = enableRRS;
this.multipartEnabled = multipart;
}
@ -268,7 +270,6 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO {
}
public boolean getEnableRRS() {
return enableRRS;
}
@ -277,5 +278,14 @@ public final class S3TO implements S3Utils.ClientOptions, DataStoreTO {
this.enableRRS = enableRRS;
}
public boolean isMultipartEnabled() {
return multipartEnabled;
}
public void setMultipartEnabled(boolean multipartEnabled) {
this.multipartEnabled = multipartEnabled;
}
}

View File

@ -47,8 +47,6 @@ import com.amazonaws.services.s3.model.ProgressEvent;
import com.amazonaws.services.s3.model.ProgressListener;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType;
@ -227,9 +225,6 @@ public class S3TemplateDownloader extends ManagedContextRunnable implements Temp
// compute s3 key
s3Key = join(asList(installPath, fileName), S3Utils.SEPARATOR);
// multi-part upload using S3 api to handle > 5G input stream
TransferManager tm = new TransferManager(S3Utils.acquireClient(s3));
// download using S3 API
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(remoteSize);
@ -262,11 +257,19 @@ public class S3TemplateDownloader extends ManagedContextRunnable implements Temp
}
});
// TransferManager processes all transfers asynchronously,
// so this call will return immediately.
Upload upload = tm.upload(putObjectRequest);
upload.waitForCompletion();
if ( s3.isMultipartEnabled()){
// use TransferManager to do multipart upload
S3Utils.mputObject(s3, putObjectRequest);
} else{
// single part upload, with 5GB limit in Amazon
S3Utils.putObject(s3, putObjectRequest);
while (status != TemplateDownloader.Status.DOWNLOAD_FINISHED &&
status != TemplateDownloader.Status.UNRECOVERABLE_ERROR &&
status != TemplateDownloader.Status.ABORTED) {
// wait for completion
}
}
// finished or aborted
Date finish = new Date();

View File

@ -66,7 +66,10 @@ public class S3ImageStoreDriverImpl extends BaseImageStoreDriverImpl {
details.get(ApiConstants.S3_SOCKET_TIMEOUT) == null ? null : Integer.valueOf(details
.get(ApiConstants.S3_SOCKET_TIMEOUT)), imgStore.getCreated(),
_configDao.getValue(Config.S3EnableRRS.toString()) == null ? false : Boolean.parseBoolean(_configDao
.getValue(Config.S3EnableRRS.toString())));
.getValue(Config.S3EnableRRS.toString())),
_configDao.getValue(Config.S3EnableMultiPartUpload.toString()) == null ? true : Boolean.parseBoolean(_configDao
.getValue(Config.S3EnableMultiPartUpload.toString()))
);
}

View File

@ -376,6 +376,7 @@ public enum Config {
// object store
S3EnableRRS("Advanced", ManagementServer.class, Boolean.class, "s3.rrs.enabled", "false", "enable s3 reduced redundancy storage", null),
S3EnableMultiPartUpload("Advanced", ManagementServer.class, Boolean.class, "s3.multipart.enabled", "true", "enable s3 multipart upload", null),
// Ldap
LdapBasedn("Advanced", ManagementServer.class, String.class, "ldap.basedn", null, "Sets the basedn for LDAP", null),

View File

@ -16,6 +16,15 @@
// under the License.
package org.apache.cloudstack.storage.resource;
import static com.cloud.utils.S3Utils.mputFile;
import static com.cloud.utils.S3Utils.putFile;
import static com.cloud.utils.StringUtils.join;
import static com.cloud.utils.db.GlobalLock.executeWithNoWaitLock;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static org.apache.commons.lang.StringUtils.substringAfterLast;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@ -40,21 +49,6 @@ import java.util.concurrent.Callable;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.storage.command.CopyCmdAnswer;
import org.apache.cloudstack.storage.command.CopyCommand;
import org.apache.cloudstack.storage.command.DeleteCommand;
import org.apache.cloudstack.storage.command.DownloadCommand;
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
import org.apache.cloudstack.storage.template.DownloadManager;
import org.apache.cloudstack.storage.template.DownloadManagerImpl;
import org.apache.cloudstack.storage.template.DownloadManagerImpl.ZfsPathParser;
import org.apache.cloudstack.storage.template.UploadManager;
import org.apache.cloudstack.storage.template.UploadManagerImpl;
import org.apache.cloudstack.storage.to.ImageStoreTO;
import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
import org.apache.cloudstack.storage.to.SnapshotObjectTO;
import org.apache.cloudstack.storage.to.TemplateObjectTO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
@ -67,6 +61,21 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.cloudstack.storage.command.CopyCmdAnswer;
import org.apache.cloudstack.storage.command.CopyCommand;
import org.apache.cloudstack.storage.command.DeleteCommand;
import org.apache.cloudstack.storage.command.DownloadCommand;
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
import org.apache.cloudstack.storage.template.DownloadManager;
import org.apache.cloudstack.storage.template.DownloadManagerImpl;
import org.apache.cloudstack.storage.template.DownloadManagerImpl.ZfsPathParser;
import org.apache.cloudstack.storage.template.UploadManager;
import org.apache.cloudstack.storage.template.UploadManagerImpl;
import org.apache.cloudstack.storage.to.SnapshotObjectTO;
import org.apache.cloudstack.storage.to.TemplateObjectTO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckHealthAnswer;
import com.cloud.agent.api.CheckHealthCommand;
@ -108,7 +117,6 @@ import com.cloud.hypervisor.Hypervisor.HypervisorType;
import com.cloud.resource.ServerResourceBase;
import com.cloud.storage.DataStoreRole;
import com.cloud.storage.Storage.ImageFormat;
import com.cloud.storage.Storage.StoragePoolType;
import com.cloud.storage.StorageLayer;
import com.cloud.storage.VMTemplateStorageResourceAssoc;
import com.cloud.storage.template.Processor;
@ -128,14 +136,6 @@ import com.cloud.utils.net.NetUtils;
import com.cloud.utils.script.OutputInterpreter;
import com.cloud.utils.script.Script;
import com.cloud.vm.SecondaryStorageVm;
import com.google.common.io.Files;
import static com.cloud.utils.S3Utils.putFile;
import static com.cloud.utils.StringUtils.join;
import static com.cloud.utils.db.GlobalLock.executeWithNoWaitLock;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static org.apache.commons.lang.StringUtils.substringAfterLast;
public class NfsSecondaryStorageResource extends ServerResourceBase implements SecondaryStorageResource {
@ -197,7 +197,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
}
public void setInSystemVM(boolean inSystemVM) {
this._inSystemVM = inSystemVM;
_inSystemVM = inSystemVM;
}
@Override
@ -297,7 +297,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
String finalFileName = templateFilename;
String finalDownloadPath = destPath + File.separator + templateFilename;
// compute the size of
long size = this._storage.getSize(downloadPath + File.separator + templateFilename);
long size = _storage.getSize(downloadPath + File.separator + templateFilename);
DataTO newDestTO = null;
@ -374,7 +374,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
protected Answer copySnapshotToTemplateFromNfsToNfsXenserver(CopyCommand cmd, SnapshotObjectTO srcData,
NfsTO srcDataStore, TemplateObjectTO destData, NfsTO destDataStore) {
String srcMountPoint = this.getRootDir(srcDataStore.getUrl());
String srcMountPoint = getRootDir(srcDataStore.getUrl());
String snapshotPath = srcData.getPath();
int index = snapshotPath.lastIndexOf("/");
String snapshotName = snapshotPath.substring(index + 1);
@ -384,16 +384,16 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
snapshotPath = snapshotPath.substring(0, index);
snapshotPath = srcMountPoint + File.separator + snapshotPath;
String destMountPoint = this.getRootDir(destDataStore.getUrl());
String destMountPoint = getRootDir(destDataStore.getUrl());
String destPath = destMountPoint + File.separator + destData.getPath();
String errMsg = null;
try {
this._storage.mkdir(destPath);
_storage.mkdir(destPath);
String templateUuid = UUID.randomUUID().toString();
String templateName = templateUuid + ".vhd";
Script command = new Script(this.createTemplateFromSnapshotXenScript, cmd.getWait() * 1000, s_logger);
Script command = new Script(createTemplateFromSnapshotXenScript, cmd.getWait() * 1000, s_logger);
command.add("-p", snapshotPath);
command.add("-s", snapshotName);
command.add("-n", templateName);
@ -468,7 +468,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
bufferWriter.write("\n");
bufferWriter.write("filename=" + fileName);
bufferWriter.write("\n");
long size = this._storage.getSize(destFileFullPath);
long size = _storage.getSize(destFileFullPath);
bufferWriter.write("size=" + size);
bufferWriter.close();
writer.close();
@ -630,7 +630,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
NfsTO destImageStore = (NfsTO) destDataStore;
if (srcDataStore instanceof S3TO) {
S3TO s3 = (S3TO) srcDataStore;
return this.copyFromS3ToNfs(cmd, srcData, s3, destData, destImageStore);
return copyFromS3ToNfs(cmd, srcData, s3, destData, destImageStore);
} else if (srcDataStore instanceof SwiftTO) {
return copyFromSwiftToNfs(cmd, srcData, (SwiftTO)srcDataStore, destData, destImageStore);
}
@ -857,9 +857,13 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
}
}
}
ImageFormat format = this.getTemplateFormat(srcFile.getName());
ImageFormat format = getTemplateFormat(srcFile.getName());
String key = destData.getPath() + S3Utils.SEPARATOR + srcFile.getName();
putFile(s3, srcFile, bucket, key);
if (s3.isMultipartEnabled()){
mputFile(s3, srcFile, bucket, key);
} else{
putFile(s3, srcFile, bucket, key);
}
DataTO retObj = null;
if (destData.getObjectType() == DataObjectType.TEMPLATE) {
@ -1271,9 +1275,9 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
int index = name.lastIndexOf(File.separator);
String snapshotPath = name.substring(0, index);
if (deleteAllFlag) {
lPath = this.getRootDir(secondaryStorageUrl) + File.separator + snapshotPath + File.separator + "*";
lPath = getRootDir(secondaryStorageUrl) + File.separator + snapshotPath + File.separator + "*";
} else {
lPath = this.getRootDir(secondaryStorageUrl) + File.separator + name + "*";
lPath = getRootDir(secondaryStorageUrl) + File.separator + name + "*";
}
final String result = deleteLocalFile(lPath);
@ -1461,7 +1465,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
Map<String, TemplateProp> s3ListTemplate(S3TO s3) {
String bucket = s3.getBucketName();
// List the objects in the source directory on S3
final List<S3ObjectSummary> objectSummaries = S3Utils.getDirectory(s3, bucket, this.TEMPLATE_ROOT_DIR);
final List<S3ObjectSummary> objectSummaries = S3Utils.getDirectory(s3, bucket, TEMPLATE_ROOT_DIR);
if (objectSummaries == null) {
return null;
}
@ -1470,7 +1474,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
String key = objectSummary.getKey();
// String installPath = StringUtils.substringBeforeLast(key,
// S3Utils.SEPARATOR);
String uniqueName = this.determineS3TemplateNameFromKey(key);
String uniqueName = determineS3TemplateNameFromKey(key);
// TODO: isPublic value, where to get?
TemplateProp tInfo = new TemplateProp(uniqueName, key, objectSummary.getSize(), objectSummary.getSize(),
true, false);
@ -1483,7 +1487,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
Map<Long, TemplateProp> s3ListVolume(S3TO s3) {
String bucket = s3.getBucketName();
// List the objects in the source directory on S3
final List<S3ObjectSummary> objectSummaries = S3Utils.getDirectory(s3, bucket, this.VOLUME_ROOT_DIR);
final List<S3ObjectSummary> objectSummaries = S3Utils.getDirectory(s3, bucket, VOLUME_ROOT_DIR);
if (objectSummaries == null) {
return null;
}
@ -1492,7 +1496,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
String key = objectSummary.getKey();
// String installPath = StringUtils.substringBeforeLast(key,
// S3Utils.SEPARATOR);
Long id = this.determineS3VolumeIdFromKey(key);
Long id = determineS3VolumeIdFromKey(key);
// TODO: how to get volume template name
TemplateProp tInfo = new TemplateProp(id.toString(), key, objectSummary.getSize(), objectSummary.getSize(),
true, false);

View File

@ -389,4 +389,6 @@ CREATE VIEW `cloud`.`volume_view` AS
`cloud`.`async_job` ON async_job.instance_id = volumes.id
and async_job.instance_type = 'Volume'
and async_job.job_status = 0;
INSERT IGNORE INTO `cloud`.`configuration`(category, instance, component, name, value, description, default_value) VALUES ('Advanced', 'DEFAULT', 'management-server', 's3.multipart.enabled', 'true', 'enable s3 multipart upload', 'true');

View File

@ -48,6 +48,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.HttpMethod;
import com.amazonaws.auth.AWSCredentials;
@ -61,6 +62,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import com.cloud.utils.exception.CloudRuntimeException;
public final class S3Utils {
@ -171,6 +175,58 @@ public final class S3Utils {
}
// multi-part upload file
public static void mputFile(final ClientOptions clientOptions,
final File sourceFile, final String bucketName, final String key) throws InterruptedException {
assert clientOptions != null;
assert sourceFile != null;
assert !isBlank(bucketName);
assert !isBlank(key);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Multipart sending file %1$s as S3 object %2$s in "
+ "bucket %3$s", sourceFile.getName(), key, bucketName));
}
TransferManager tm = new TransferManager(S3Utils.acquireClient(clientOptions));
Upload upload = tm.upload(bucketName, key, sourceFile);
upload.waitForCompletion();
}
// multi-part upload object
public static void mputObject(final ClientOptions clientOptions,
final InputStream sourceStream, final String bucketName, final String key) throws InterruptedException {
assert clientOptions != null;
assert sourceStream != null;
assert !isBlank(bucketName);
assert !isBlank(key);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Multipart sending stream as S3 object %1$s in "
+ "bucket %2$s", key, bucketName));
}
TransferManager tm = new TransferManager(S3Utils.acquireClient(clientOptions));
Upload upload = tm.upload(bucketName, key, sourceStream, null);
upload.waitForCompletion();
}
// multi-part upload object
public static void mputObject(final ClientOptions clientOptions,
final PutObjectRequest req) throws InterruptedException {
assert clientOptions != null;
assert req != null;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Multipart sending object to S3 using PutObjectRequest");
}
TransferManager tm = new TransferManager(S3Utils.acquireClient(clientOptions));
Upload upload = tm.upload(req);
upload.waitForCompletion();
}
public static void setObjectAcl(final ClientOptions clientOptions, final String bucketName, final String key,
final CannedAccessControlList acl) {