mirror of https://github.com/apache/cloudstack.git
volume upload: used netty server to do the file upload
This commit is contained in:
parent
41382f6f04
commit
643165a07e
|
|
@ -54,16 +54,12 @@
|
|||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<!-- dependencies for starting a post upload server on ssvm -->
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpcore</artifactId>
|
||||
<version>${cs.httpcore.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpcore-nio</artifactId>
|
||||
<version>${cs.httpcore.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<version>4.0.25.Final</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,253 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package org.apache.cloudstack.storage.resource;
|
||||
|
||||
import static io.netty.buffer.Unpooled.copiedBuffer;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.cloudstack.storage.template.UploadEntity;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.exception.InvalidParameterValueException;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpContent;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import io.netty.handler.codec.http.HttpObject;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import io.netty.handler.codec.http.HttpVersion;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
|
||||
import io.netty.handler.codec.http.multipart.DiskFileUpload;
|
||||
import io.netty.handler.codec.http.multipart.FileUpload;
|
||||
import io.netty.handler.codec.http.multipart.HttpDataFactory;
|
||||
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
|
||||
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.ErrorDataDecoderException;
|
||||
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.IncompatibleDataDecoderException;
|
||||
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
|
||||
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> {
|
||||
private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());
|
||||
|
||||
private static final HttpDataFactory factory = new DefaultHttpDataFactory(true);
|
||||
|
||||
private final StringBuilder responseContent = new StringBuilder();
|
||||
|
||||
private HttpRequest request;
|
||||
|
||||
private HttpPostRequestDecoder decoder;
|
||||
|
||||
private NfsSecondaryStorageResource storageResource;
|
||||
|
||||
private String uuid;
|
||||
|
||||
private static final String HEADER_SIGNATURE = "X-signature";
|
||||
|
||||
private static final String HEADER_METADATA = "X-metadata";
|
||||
|
||||
private static final String HEADER_EXPIRES = "X-expires";
|
||||
|
||||
private static final String HEADER_HOST = "X-Forwarded-Host";
|
||||
|
||||
public HttpUploadServerHandler(NfsSecondaryStorageResource storageResource) {
|
||||
this.storageResource = storageResource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
if (decoder != null) {
|
||||
decoder.cleanFiles();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
if (msg instanceof HttpRequest) {
|
||||
HttpRequest request = this.request = (HttpRequest) msg;
|
||||
responseContent.setLength(0);
|
||||
|
||||
if (request.getMethod().equals(HttpMethod.POST)) {
|
||||
|
||||
URI uri = new URI(request.getUri());
|
||||
|
||||
String signature = null;
|
||||
String expires = null;
|
||||
String metadata = null;
|
||||
String hostname = null;
|
||||
for (Entry<String, String> entry : request.headers()) {
|
||||
switch (entry.getKey()) {
|
||||
case HEADER_SIGNATURE:
|
||||
signature = entry.getValue();
|
||||
break;
|
||||
case HEADER_METADATA:
|
||||
metadata = entry.getValue();
|
||||
break;
|
||||
case HEADER_EXPIRES:
|
||||
expires = entry.getValue();
|
||||
break;
|
||||
case HEADER_HOST:
|
||||
hostname = entry.getValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
logger.info("HEADER: signature=" + signature);
|
||||
logger.info("HEADER: metadata=" + metadata);
|
||||
logger.info("HEADER: expires=" + expires);
|
||||
logger.info("HEADER: hostname=" + hostname);
|
||||
QueryStringDecoder decoderQuery = new QueryStringDecoder(uri);
|
||||
Map<String, List<String>> uriAttributes = decoderQuery.parameters();
|
||||
uuid = uriAttributes.get("uuid").get(0);
|
||||
logger.info("URI: uuid=" + uuid);
|
||||
|
||||
UploadEntity uploadEntity = null;
|
||||
try {
|
||||
// Validate the request here
|
||||
storageResource.validatePostUploadRequest(signature, metadata, expires, hostname, uuid);
|
||||
//create an upload entity. This will fail if entity already exists.
|
||||
uploadEntity = storageResource.createUploadEntity(uuid, metadata);
|
||||
} catch (InvalidParameterValueException ex) {
|
||||
logger.error("post request validation failed", ex);
|
||||
responseContent.append(ex.getMessage());
|
||||
writeResponse(ctx.channel());
|
||||
return;
|
||||
}
|
||||
if (uploadEntity == null) {
|
||||
logger.error("Unable to create upload entity. An exception occurred.");
|
||||
responseContent.append("Internal Server Error");
|
||||
writeResponse(ctx.channel());
|
||||
return;
|
||||
}
|
||||
//set the base directory to download the file
|
||||
DiskFileUpload.baseDirectory = uploadEntity.getInstallPathPrefix();
|
||||
logger.info("base directory: " + DiskFileUpload.baseDirectory);
|
||||
try {
|
||||
//initialize the decoder
|
||||
decoder = new HttpPostRequestDecoder(factory, request);
|
||||
} catch (ErrorDataDecoderException | IncompatibleDataDecoderException e) {
|
||||
logger.error("exception while initialising the decoder", e);
|
||||
responseContent.append(e.getMessage());
|
||||
writeResponse(ctx.channel());
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
logger.warn("received a get request");
|
||||
responseContent.append("only post requests are allowed");
|
||||
writeResponse(ctx.channel());
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
// check if the decoder was constructed before
|
||||
if (decoder != null) {
|
||||
if (msg instanceof HttpContent) {
|
||||
// New chunk is received
|
||||
HttpContent chunk = (HttpContent) msg;
|
||||
try {
|
||||
decoder.offer(chunk);
|
||||
} catch (ErrorDataDecoderException e) {
|
||||
logger.error("data decoding exception", e);
|
||||
responseContent.append(e.getMessage());
|
||||
writeResponse(ctx.channel());
|
||||
return;
|
||||
}
|
||||
if (chunk instanceof LastHttpContent) {
|
||||
readFileUploadData();
|
||||
writeResponse(ctx.channel());
|
||||
reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
request = null;
|
||||
// destroy the decoder to release all resources
|
||||
decoder.destroy();
|
||||
decoder = null;
|
||||
}
|
||||
|
||||
private void readFileUploadData() throws IOException {
|
||||
while (decoder.hasNext()) {
|
||||
InterfaceHttpData data = decoder.next();
|
||||
if (data != null) {
|
||||
try {
|
||||
logger.info("BODY FileUpload: " + data.getHttpDataType().name() + ": " + data);
|
||||
if (data.getHttpDataType() == HttpDataType.FileUpload) {
|
||||
FileUpload fileUpload = (FileUpload) data;
|
||||
if (fileUpload.isCompleted()) {
|
||||
responseContent.append("upload successful.");
|
||||
storageResource.postUpload(uuid, fileUpload.getFile().getName());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeResponse(Channel channel) {
|
||||
// Convert the response content to a ChannelBuffer.
|
||||
ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);
|
||||
responseContent.setLength(0);
|
||||
// Decide whether to close the connection or not.
|
||||
boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.headers().get(CONNECTION)) ||
|
||||
request.getProtocolVersion().equals(HttpVersion.HTTP_1_0) && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.headers().get(CONNECTION));
|
||||
// Build the response object.
|
||||
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
|
||||
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
|
||||
if (!close) {
|
||||
// There's no need to add 'Content-Length' header if this is the last response.
|
||||
response.headers().set(CONTENT_LENGTH, buf.readableBytes());
|
||||
}
|
||||
// Write the response.
|
||||
ChannelFuture future = channel.writeAndFlush(response);
|
||||
// Close the connection after the write operation is done if necessary.
|
||||
if (close) {
|
||||
future.addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
logger.warn(responseContent.toString(), cause);
|
||||
responseContent.append("\r\nException occurred: ").append(cause.getMessage());
|
||||
writeResponse(ctx.channel());
|
||||
ctx.channel().close();
|
||||
}
|
||||
}
|
||||
|
|
@ -25,8 +25,6 @@ import static org.apache.commons.lang.StringUtils.substringAfterLast;
|
|||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
|
|
@ -34,70 +32,50 @@ import java.io.FileReader;
|
|||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.math.BigInteger;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import com.cloud.exception.InvalidParameterValueException;
|
||||
import com.cloud.storage.Storage;
|
||||
import com.cloud.utils.EncryptionUtil;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.codec.http.HttpContentCompressor;
|
||||
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import org.apache.cloudstack.storage.command.TemplateOrVolumePostUploadCommand;
|
||||
import org.apache.cloudstack.storage.template.UploadEntity;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpEntityEnclosingRequest;
|
||||
import org.apache.http.HttpException;
|
||||
import org.apache.http.HttpRequest;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.utils.URLEncodedUtils;
|
||||
import org.apache.http.config.ConnectionConfig;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.DefaultHttpClient;
|
||||
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
|
||||
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
|
||||
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
|
||||
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
|
||||
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
||||
import org.apache.http.nio.NHttpConnectionFactory;
|
||||
import org.apache.http.nio.NHttpServerConnection;
|
||||
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
|
||||
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
|
||||
import org.apache.http.nio.protocol.HttpAsyncExchange;
|
||||
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
|
||||
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
|
||||
import org.apache.http.nio.protocol.HttpAsyncService;
|
||||
import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper;
|
||||
import org.apache.http.nio.reactor.IOEventDispatch;
|
||||
import org.apache.http.nio.reactor.ListeningIOReactor;
|
||||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.protocol.HttpProcessor;
|
||||
import org.apache.http.protocol.HttpProcessorBuilder;
|
||||
import org.apache.http.protocol.ResponseConnControl;
|
||||
import org.apache.http.protocol.ResponseContent;
|
||||
import org.apache.http.protocol.ResponseDate;
|
||||
import org.apache.http.protocol.ResponseServer;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
|
@ -181,6 +159,8 @@ import com.cloud.utils.net.NetUtils;
|
|||
import com.cloud.utils.script.OutputInterpreter;
|
||||
import com.cloud.utils.script.Script;
|
||||
import com.cloud.vm.SecondaryStorageVm;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
|
||||
public class NfsSecondaryStorageResource extends ServerResourceBase implements SecondaryStorageResource {
|
||||
|
||||
|
|
@ -228,6 +208,7 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
final private String _tmpltpp = "template.properties";
|
||||
protected String createTemplateFromSnapshotXenScript;
|
||||
private HashMap<String,UploadEntity> uploadEntityStateMap = new HashMap<String,UploadEntity>();
|
||||
private String _ssvmPSK = null;
|
||||
|
||||
public void setParentPath(String path) {
|
||||
_parent = path;
|
||||
|
|
@ -1342,74 +1323,48 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
}
|
||||
|
||||
savePostUploadPSK(cmd.getPostUploadKey());
|
||||
try {
|
||||
startNioServerForPostUpload();
|
||||
} catch (IOException e) {
|
||||
//returning TRUE even if this fails as this will only disable post upload functionality
|
||||
s_logger.error("exception while starting the nio server for post upload.", e);
|
||||
}
|
||||
startPostUploadServer();
|
||||
return answer;
|
||||
}
|
||||
|
||||
private void startNioServerForPostUpload() throws IOException {
|
||||
//TODO: make port configurable.
|
||||
final int port = 8210;
|
||||
// Create HTTP protocol processing chain
|
||||
HttpProcessor httpproc = HttpProcessorBuilder.create()
|
||||
.add(new ResponseDate())
|
||||
.add(new ResponseServer("HTTP/1.1"))
|
||||
.add(new ResponseContent())
|
||||
.add(new ResponseConnControl()).build();
|
||||
// Create request handler registry
|
||||
UriHttpAsyncRequestHandlerMapper reqistry = new UriHttpAsyncRequestHandlerMapper();
|
||||
// Register the default handler for all URIs
|
||||
reqistry.register("/upload*", new PostUploadRequestHandler(_params,this,_dlMgr));
|
||||
// Create server-side HTTP protocol handler
|
||||
HttpAsyncService protocolHandler = new HttpAsyncService(httpproc, reqistry) {
|
||||
|
||||
private void startPostUploadServer() {
|
||||
final int PORT = 8210;
|
||||
final int NO_OF_WORKERS = 15;
|
||||
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
final EventLoopGroup workerGroup = new NioEventLoopGroup(NO_OF_WORKERS);
|
||||
final ServerBootstrap b = new ServerBootstrap();
|
||||
final NfsSecondaryStorageResource storageResource = this;
|
||||
b.group(bossGroup, workerGroup);
|
||||
b.channel(NioServerSocketChannel.class);
|
||||
b.handler(new LoggingHandler(LogLevel.INFO));
|
||||
b.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
public void connected(final NHttpServerConnection conn) {
|
||||
s_logger.info(conn + ": connection open");
|
||||
super.connected(conn);
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast(new HttpRequestDecoder());
|
||||
pipeline.addLast(new HttpResponseEncoder());
|
||||
pipeline.addLast(new HttpContentCompressor());
|
||||
pipeline.addLast(new HttpUploadServerHandler(storageResource));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closed(final NHttpServerConnection conn) {
|
||||
s_logger.info(conn + ": connection closed");
|
||||
super.closed(conn);
|
||||
}
|
||||
|
||||
};
|
||||
// Create HTTP connection factory
|
||||
NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;
|
||||
|
||||
connFactory = new DefaultNHttpServerConnectionFactory(
|
||||
ConnectionConfig.DEFAULT);
|
||||
// Create server-side I/O event dispatch
|
||||
final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
|
||||
// Set I/O reactor defaults
|
||||
IOReactorConfig config = IOReactorConfig.custom()
|
||||
.setIoThreadCount(15)
|
||||
.setSoTimeout(3000)
|
||||
.setConnectTimeout(3000)
|
||||
.build();
|
||||
|
||||
// Create server-side I/O reactor
|
||||
final ListeningIOReactor ioReactor = new DefaultListeningIOReactor(config);
|
||||
});
|
||||
new Thread() {
|
||||
@Override public void run() {
|
||||
// Listen of the given port
|
||||
ioReactor.listen(new InetSocketAddress(port));
|
||||
// Ready to go!
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
ioReactor.execute(ioEventDispatch);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Exception while starting the post upload server on port: " + port);
|
||||
Channel ch = b.bind(PORT).sync().channel();
|
||||
s_logger.info(String.format("Started post upload server on port %d with %d workers",PORT,NO_OF_WORKERS));
|
||||
ch.closeFuture().sync();
|
||||
} catch (InterruptedException e) {
|
||||
s_logger.info("Failed to start post upload server");
|
||||
s_logger.debug("Exception while starting post upload server", e);
|
||||
} finally {
|
||||
bossGroup.shutdownGracefully();
|
||||
workerGroup.shutdownGracefully();
|
||||
s_logger.info("shutting down post upload server");
|
||||
}
|
||||
s_logger.info("Nio server for post upload on port: " + port + " is shutdown.");
|
||||
}
|
||||
}.start();
|
||||
s_logger.info("Started Nioserver for post upload on port: " + port);
|
||||
s_logger.info("created a thread to start post upload server");
|
||||
}
|
||||
|
||||
private void savePostUploadPSK(String psk) {
|
||||
|
|
@ -1721,21 +1676,21 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
}
|
||||
|
||||
private UploadStatusAnswer execute(UploadStatusCommand cmd) {
|
||||
long entityId = cmd.getEntityId();
|
||||
if (uploadEntityStateMap.containsKey(entityId)) {
|
||||
UploadEntity uploadEntity = uploadEntityStateMap.get(entityId);
|
||||
if (uploadEntity.getUploadState()== UploadEntity.Status.ERROR) {
|
||||
uploadEntityStateMap.remove(entityId);
|
||||
String entityUuid = cmd.getEntityUuid();
|
||||
if (uploadEntityStateMap.containsKey(entityUuid)) {
|
||||
UploadEntity uploadEntity = uploadEntityStateMap.get(entityUuid);
|
||||
if (uploadEntity.getUploadState() == UploadEntity.Status.ERROR) {
|
||||
uploadEntityStateMap.remove(entityUuid);
|
||||
return new UploadStatusAnswer(cmd, UploadStatus.ERROR, uploadEntity.getErrorMessage());
|
||||
}else if (uploadEntity.getUploadState()== UploadEntity.Status.COMPLETED) {
|
||||
} else if (uploadEntity.getUploadState() == UploadEntity.Status.COMPLETED) {
|
||||
UploadStatusAnswer answer = new UploadStatusAnswer(cmd, UploadStatus.COMPLETED);
|
||||
answer.setVirtualSize(uploadEntity.getVirtualSize());
|
||||
answer.setInstallPath(uploadEntity.getTmpltPath());
|
||||
answer.setPhysicalSize(uploadEntity.getEntitysize());
|
||||
uploadEntityStateMap.remove(entityId);
|
||||
answer.setPhysicalSize(uploadEntity.getPhysicalSize());
|
||||
uploadEntityStateMap.remove(entityUuid);
|
||||
return answer;
|
||||
}else if (uploadEntity.getUploadState()==UploadEntity.Status.IN_PROGRESS) {
|
||||
return new UploadStatusAnswer(cmd,UploadStatus.IN_PROGRESS);
|
||||
} else if (uploadEntity.getUploadState() == UploadEntity.Status.IN_PROGRESS) {
|
||||
return new UploadStatusAnswer(cmd, UploadStatus.IN_PROGRESS);
|
||||
}
|
||||
}
|
||||
return new UploadStatusAnswer(cmd, UploadStatus.UNKNOWN);
|
||||
|
|
@ -2630,357 +2585,224 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
|||
}
|
||||
}
|
||||
|
||||
//TODO: move this class to a separate file
|
||||
private class PostUploadRequestHandler implements HttpAsyncRequestHandler<HttpRequest> {
|
||||
//private static final Logger s_logger = Logger.getLogger(PostUploadRequestHandler.class);
|
||||
private String getScriptLocation(UploadEntity.ResourceType resourceType) {
|
||||
|
||||
private String createTmpltScr;
|
||||
private String createVolScr;
|
||||
private String scriptsDir;
|
||||
private int installTimeoutPerGig = 180 * 60 * 1000;
|
||||
private Map<String,ImageFormat> formatMap = new HashMap<>();
|
||||
private DownloadManager _downloadMgr;
|
||||
private SecondaryStorageResource _resource;
|
||||
|
||||
public PostUploadRequestHandler(Map<String, Object> params, SecondaryStorageResource resource, DownloadManager dlmgr) {
|
||||
configure(params, dlmgr);
|
||||
this._resource=resource;
|
||||
String scriptsDir = (String) _params.get("template.scripts.dir");
|
||||
if (scriptsDir == null) {
|
||||
scriptsDir = "scripts/storage/secondary";
|
||||
}
|
||||
String scriptname = null;
|
||||
if (resourceType == UploadEntity.ResourceType.VOLUME) {
|
||||
scriptname = "createvolume.sh";
|
||||
} else if (resourceType == UploadEntity.ResourceType.TEMPLATE) {
|
||||
scriptname = "createtmplt.sh";
|
||||
} else {
|
||||
throw new InvalidParameterValueException("cannot find script for resource type: " + resourceType);
|
||||
}
|
||||
return Script.findScript(scriptsDir, scriptname);
|
||||
}
|
||||
|
||||
public void configure(Map<String, Object> params, DownloadManager dlmgr) {
|
||||
|
||||
_downloadMgr=dlmgr;
|
||||
String scriptsDir = (String)params.get("template.scripts.dir");
|
||||
if (scriptsDir == null) {
|
||||
scriptsDir = "scripts/storage/secondary";
|
||||
public UploadEntity createUploadEntity(String uuid, String metadata) {
|
||||
TemplateOrVolumePostUploadCommand cmd = getTemplateOrVolumePostUploadCmd(metadata);
|
||||
UploadEntity uploadEntity = null;
|
||||
if(cmd == null ){
|
||||
updateStateMapWithError(uuid,"unable decode and deserialize metadata.");
|
||||
throw new InvalidParameterValueException("unable to decode and deserialize metadata");
|
||||
} else {
|
||||
uuid = cmd.getEntityUUID();
|
||||
if(uploadEntityStateMap.containsKey(uuid)) {
|
||||
uploadEntity = uploadEntityStateMap.get(uuid);
|
||||
throw new InvalidParameterValueException("The one time post url is already used and the upload is in " + uploadEntity.getUploadState() + " state.");
|
||||
}
|
||||
try {
|
||||
createTmpltScr = Script.findScript(scriptsDir, "createtmplt.sh");
|
||||
if (createTmpltScr == null) {
|
||||
throw new ConfigurationException("Unable to find createtmplt.sh");
|
||||
}
|
||||
s_logger.info("createtmplt.sh found in " + createTmpltScr);
|
||||
createVolScr = Script.findScript(scriptsDir, "createvolume.sh");
|
||||
if (createVolScr == null) {
|
||||
throw new ConfigurationException("Unable to find createvolume.sh");
|
||||
}
|
||||
s_logger.info("createvolume.sh found in " + createVolScr);
|
||||
|
||||
}catch (Exception e) {
|
||||
s_logger.debug("failed to configure the postupload handler "+e);
|
||||
}
|
||||
|
||||
formatMap.put("qcow2",ImageFormat.QCOW2);
|
||||
formatMap.put("vhd",ImageFormat.VHD);
|
||||
formatMap.put("iso",ImageFormat.ISO);
|
||||
formatMap.put("vmdk",ImageFormat.VMDK);
|
||||
formatMap.put("tar",ImageFormat.TAR);
|
||||
formatMap.put("ova",ImageFormat.OVA);
|
||||
|
||||
}
|
||||
|
||||
public void handleuplod(long entityId, String installPathPrefix, String uuid, String localTemppath, String entityName, long entitySize, String resourceType, String imageFormat, byte[] data, boolean isHvm, String chksum, String dataStoreUrl) {
|
||||
UploadEntity uploadEntity=null;
|
||||
try {
|
||||
|
||||
if (uploadEntityStateMap.containsKey(uuid)) {
|
||||
//the file upload entity has been created.
|
||||
uploadEntity = uploadEntityStateMap.get(uuid);
|
||||
if (uploadEntity.getUploadState() == UploadEntity.Status.ERROR) {
|
||||
s_logger.debug("received data write requres for a uplod entity in error state. Ignoring.");
|
||||
}else if (uploadEntity.getUploadState()== UploadEntity.Status.COMPLETED) {
|
||||
s_logger.debug("received a write request for entity which is alredy downloaded. Ignoring.");
|
||||
}else if (uploadEntity.getUploadState()== UploadEntity.Status.IN_PROGRESS) {
|
||||
uploadEntity.getFilewriter().write(data);
|
||||
uploadEntity.incremetByteCount(data.length);
|
||||
if (uploadEntity.getDownloadedsize() == uploadEntity.getEntitysize()) {
|
||||
postUpload(uploadEntity);
|
||||
} else {
|
||||
uploadEntity.setStatus(UploadEntity.Status.IN_PROGRESS);
|
||||
}
|
||||
}
|
||||
}else{
|
||||
//this is a new upload.
|
||||
uploadEntity = new UploadEntity(uuid, entityId, entitySize, UploadEntity.Status.IN_PROGRESS, entityName, installPathPrefix);
|
||||
uploadEntity.setResourceType("volume".equalsIgnoreCase(resourceType) ? UploadEntity.ResourceType.VOLUME : UploadEntity.ResourceType.TEMPLATE);
|
||||
uploadEntity.setFormat(formatMap.get(imageFormat.toLowerCase()));
|
||||
//relative path wihour ssvm mount info.
|
||||
uploadEntity.setTemplatePath(installPathPrefix);
|
||||
installPathPrefix = _resource.getRootDir(dataStoreUrl) + File.separator + installPathPrefix;
|
||||
uploadEntity.setInstallPathPrefix(installPathPrefix);
|
||||
uploadEntity.setEntitysize(entitySize);
|
||||
uploadEntity.setHvm(isHvm);
|
||||
uploadEntity.setFilename(entityName);
|
||||
if (!_storage.exists(installPathPrefix)) {
|
||||
_storage.mkdir(installPathPrefix);
|
||||
}
|
||||
File tempFile = File.createTempFile("dnld_", entityName,new File(installPathPrefix));
|
||||
uploadEntity.setFile(tempFile);
|
||||
FileOutputStream filewriter = new FileOutputStream(tempFile.getAbsolutePath());
|
||||
uploadEntity.setFilewriter(filewriter);
|
||||
filewriter.write(data);
|
||||
uploadEntity.setFile(tempFile);
|
||||
uploadEntity.incremetByteCount(data.length);
|
||||
uploadEntityStateMap.put(uuid, uploadEntity);
|
||||
if (uploadEntity.getDownloadedsize() == uploadEntity.getEntitysize()) {
|
||||
postUpload(uploadEntity);
|
||||
|
||||
} else {
|
||||
uploadEntity.setStatus(UploadEntity.Status.IN_PROGRESS);
|
||||
}
|
||||
String absolutePath = cmd.getAbsolutePath();
|
||||
uploadEntity = new UploadEntity(uuid, cmd.getEntityId(), UploadEntity.Status.IN_PROGRESS, cmd.getName(), absolutePath);
|
||||
uploadEntity.setMetaDataPopulated(true);
|
||||
uploadEntity.setResourceType(UploadEntity.ResourceType.valueOf(cmd.getType()));
|
||||
uploadEntity.setFormat(Storage.ImageFormat.valueOf(cmd.getImageFormat()));
|
||||
//relative path with out ssvm mount info.
|
||||
uploadEntity.setTemplatePath(absolutePath);
|
||||
String dataStoreUrl = cmd.getDataTo();
|
||||
String installPathPrefix = this.getRootDir(dataStoreUrl) + File.separator + absolutePath;
|
||||
uploadEntity.setInstallPathPrefix(installPathPrefix);
|
||||
uploadEntity.setHvm(cmd.getRequiresHvm());
|
||||
uploadEntity.setChksum(cmd.getChecksum());
|
||||
// create a install dir
|
||||
if (!_storage.exists(installPathPrefix)) {
|
||||
_storage.mkdir(installPathPrefix);
|
||||
}
|
||||
uploadEntityStateMap.put(uuid, uploadEntity);
|
||||
} catch (Exception e) {
|
||||
uploadEntity.setErrorMessage(e.getMessage());
|
||||
uploadEntity.setStatus(UploadEntity.Status.ERROR);
|
||||
try {
|
||||
uploadEntity.getFilewriter().close();
|
||||
}
|
||||
catch (Exception io) {
|
||||
s_logger.debug("exception occured when closing a file object " + io.getMessage());
|
||||
}
|
||||
s_logger.debug("exception occured while writing to a file " + e);
|
||||
//upload entity will be null incase an exception occurs and the handler will not proceed.
|
||||
s_logger.debug("exception occured while creating upload entity " + e);
|
||||
updateStateMapWithError(uuid, e.getMessage());
|
||||
}
|
||||
}
|
||||
public String postUpload(UploadEntity uploadEntity) {
|
||||
return uploadEntity;
|
||||
}
|
||||
|
||||
String resourcePath = uploadEntity.getInstallPathPrefix(); // install path prefix // path with mount
|
||||
// directory
|
||||
String finalResourcePath = uploadEntity.getTmpltPath(); // template download
|
||||
// path on secondary
|
||||
// storage
|
||||
UploadEntity.ResourceType resourceType = uploadEntity.getResourceType();
|
||||
|
||||
File originalTemplate = uploadEntity.getFile();
|
||||
//String checkSum = computeCheckSum(originalTemplate);
|
||||
//if (checkSum == null) {
|
||||
// s_logger.warn("Something wrong happened when try to calculate the checksum of downloaded template!");
|
||||
//}
|
||||
//dnld.setCheckSum(checkSum);
|
||||
public String postUpload(String uuid, String filename) {
|
||||
UploadEntity uploadEntity = uploadEntityStateMap.get(uuid);
|
||||
int installTimeoutPerGig = 180 * 60 * 1000;
|
||||
|
||||
int imgSizeGigs = (int)Math.ceil(_storage.getSize(originalTemplate.getPath()) * 1.0d / (1024 * 1024 * 1024));
|
||||
imgSizeGigs++; // add one just in case
|
||||
long timeout = (long)imgSizeGigs * installTimeoutPerGig;
|
||||
Script scr = null;
|
||||
String script = resourceType == UploadEntity.ResourceType.TEMPLATE ? createTmpltScr : createVolScr;
|
||||
scr = new Script(script, timeout, s_logger);
|
||||
scr.add("-s", Integer.toString(imgSizeGigs));
|
||||
scr.add("-S", Long.toString(UploadEntity.s_maxTemplateSize));
|
||||
//if (uploadEntity.getDescription() != null && dnld.getDescription().length() > 1) {
|
||||
// scr.add("-d", dnld.getDescription());
|
||||
//}
|
||||
if (uploadEntity.isHvm()) {
|
||||
scr.add("-h");
|
||||
}
|
||||
String resourcePath = uploadEntity.getInstallPathPrefix();
|
||||
String finalResourcePath = uploadEntity.getTmpltPath(); // template download
|
||||
UploadEntity.ResourceType resourceType = uploadEntity.getResourceType();
|
||||
|
||||
// add options common to ISO and template
|
||||
String extension = uploadEntity.getFormat().getFileExtension();
|
||||
String templateName = "";
|
||||
if (extension.equals("iso")) {
|
||||
templateName = uploadEntity.getUuid().trim().replace(" ", "_");
|
||||
} else {
|
||||
templateName = java.util.UUID.nameUUIDFromBytes((uploadEntity.getFilename() + System.currentTimeMillis()).getBytes()).toString();
|
||||
}
|
||||
String fileSavedTempLocation = uploadEntity.getInstallPathPrefix() + "/" + filename;
|
||||
//String checkSum = computeCheckSum(originalTemplate);
|
||||
//if (checkSum == null) {
|
||||
// s_logger.warn("Something wrong happened when try to calculate the checksum of downloaded template!");
|
||||
//}
|
||||
//dnld.setCheckSum(checkSum);
|
||||
|
||||
// run script to mv the temporary template file to the final template
|
||||
// file
|
||||
String templateFilename = templateName + "." + extension;
|
||||
uploadEntity.setTemplatePath(finalResourcePath + "/" + templateFilename);
|
||||
scr.add("-n", templateFilename);
|
||||
int imgSizeGigs = (int)Math.ceil(_storage.getSize(fileSavedTempLocation) * 1.0d / (1024 * 1024 * 1024));
|
||||
imgSizeGigs++; // add one just in case
|
||||
long timeout = (long)imgSizeGigs * installTimeoutPerGig;
|
||||
Script scr = new Script(getScriptLocation(resourceType), timeout, s_logger);
|
||||
scr.add("-s", Integer.toString(imgSizeGigs));
|
||||
scr.add("-S", Long.toString(UploadEntity.s_maxTemplateSize));
|
||||
//if (uploadEntity.getDescription() != null && dnld.getDescription().length() > 1) {
|
||||
// scr.add("-d", dnld.getDescription());
|
||||
//}
|
||||
if (uploadEntity.isHvm()) {
|
||||
scr.add("-h");
|
||||
}
|
||||
|
||||
scr.add("-t", resourcePath);
|
||||
scr.add("-f", uploadEntity.getFile().getAbsolutePath()); // this is the temporary
|
||||
// template file downloaded
|
||||
if (uploadEntity.getChksum() != null && uploadEntity.getChksum().length() > 1) {
|
||||
scr.add("-c", uploadEntity.getChksum());
|
||||
}
|
||||
scr.add("-u"); // cleanup
|
||||
String result;
|
||||
result = scr.execute();
|
||||
// add options common to ISO and template
|
||||
String extension = uploadEntity.getFormat().getFileExtension();
|
||||
String templateName = "";
|
||||
if (extension.equals("iso")) {
|
||||
templateName = uploadEntity.getUuid().trim().replace(" ", "_");
|
||||
} else {
|
||||
templateName = java.util.UUID.nameUUIDFromBytes((uploadEntity.getFilename() + System.currentTimeMillis()).getBytes()).toString();
|
||||
}
|
||||
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
// run script to mv the temporary template file to the final template
|
||||
// file
|
||||
String templateFilename = templateName + "." + extension;
|
||||
uploadEntity.setTemplatePath(finalResourcePath + "/" + templateFilename);
|
||||
scr.add("-n", templateFilename);
|
||||
|
||||
// Set permissions for the downloaded template
|
||||
File downloadedTemplate = new File(resourcePath + "/" + templateFilename);
|
||||
_storage.setWorldReadableAndWriteable(downloadedTemplate);
|
||||
scr.add("-t", resourcePath);
|
||||
scr.add("-f", fileSavedTempLocation); // this is the temporary
|
||||
// template file downloaded
|
||||
if (uploadEntity.getChksum() != null && uploadEntity.getChksum().length() > 1) {
|
||||
scr.add("-c", uploadEntity.getChksum());
|
||||
}
|
||||
scr.add("-u"); // cleanup
|
||||
String result;
|
||||
result = scr.execute();
|
||||
|
||||
// Set permissions for template/volume.properties
|
||||
String propertiesFile = resourcePath;
|
||||
if (resourceType == UploadEntity.ResourceType.TEMPLATE) {
|
||||
propertiesFile += "/template.properties";
|
||||
} else {
|
||||
propertiesFile += "/volume.properties";
|
||||
}
|
||||
File templateProperties = new File(propertiesFile);
|
||||
_storage.setWorldReadableAndWriteable(templateProperties);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
|
||||
TemplateLocation loc = new TemplateLocation(_storage, resourcePath);
|
||||
// Set permissions for the downloaded template
|
||||
File downloadedTemplate = new File(resourcePath + "/" + templateFilename);
|
||||
_storage.setWorldReadableAndWriteable(downloadedTemplate);
|
||||
|
||||
// Set permissions for template/volume.properties
|
||||
String propertiesFile = resourcePath;
|
||||
if (resourceType == UploadEntity.ResourceType.TEMPLATE) {
|
||||
propertiesFile += "/template.properties";
|
||||
} else {
|
||||
propertiesFile += "/volume.properties";
|
||||
}
|
||||
File templateProperties = new File(propertiesFile);
|
||||
_storage.setWorldReadableAndWriteable(templateProperties);
|
||||
|
||||
TemplateLocation loc = new TemplateLocation(_storage, resourcePath);
|
||||
try {
|
||||
loc.create(uploadEntity.getEntityId(), true, uploadEntity.getFilename());
|
||||
} catch (IOException e) {
|
||||
s_logger.warn("Something is wrong with template location " + resourcePath, e);
|
||||
loc.purge();
|
||||
return "Unable to download due to " + e.getMessage();
|
||||
}
|
||||
|
||||
Map<String, Processor> processors = _dlMgr.getProcessors();
|
||||
for (Processor processor : processors.values()) {
|
||||
FormatInfo info = null;
|
||||
try {
|
||||
loc.create(uploadEntity.getEntityId(), true, uploadEntity.getFilename());
|
||||
info = processor.process(resourcePath, null, templateName);
|
||||
} catch (InternalErrorException e) {
|
||||
s_logger.error("Template process exception ", e);
|
||||
return e.toString();
|
||||
}
|
||||
if (info != null) {
|
||||
loc.addFormat(info);
|
||||
uploadEntity.setVirtualSize(info.virtualSize);
|
||||
uploadEntity.setPhysicalSize(info.size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!loc.save()) {
|
||||
s_logger.warn("Cleaning up because we're unable to save the formats");
|
||||
loc.purge();
|
||||
}
|
||||
uploadEntity.setStatus(UploadEntity.Status.COMPLETED);
|
||||
uploadEntityStateMap.put(uploadEntity.getUuid(), uploadEntity);
|
||||
return null;
|
||||
}
|
||||
|
||||
private String getPostUploadPSK() {
|
||||
if(_ssvmPSK == null ) {
|
||||
try {
|
||||
_ssvmPSK = FileUtils.readFileToString(new File(POST_UPLOAD_KEY_LOCATION), "utf-8");
|
||||
} catch (IOException e) {
|
||||
s_logger.warn("Something is wrong with template location " + resourcePath, e);
|
||||
loc.purge();
|
||||
return "Unable to download due to " + e.getMessage();
|
||||
s_logger.debug("Error while reading SSVM PSK from location " + POST_UPLOAD_KEY_LOCATION, e);
|
||||
}
|
||||
}
|
||||
return _ssvmPSK;
|
||||
}
|
||||
|
||||
Iterator<Processor> en = _downloadMgr.getProcessesors().values().iterator();
|
||||
while (en.hasNext()) {
|
||||
Processor processor = en.next();
|
||||
private void updateStateMapWithError(String uuid,String errorMessage) {
|
||||
UploadEntity uploadEntity=null;
|
||||
if (uploadEntityStateMap.get(uuid)!=null) {
|
||||
uploadEntity=uploadEntityStateMap.get(uuid);
|
||||
}else {
|
||||
uploadEntity= new UploadEntity();
|
||||
}
|
||||
uploadEntity.setStatus(UploadEntity.Status.ERROR);
|
||||
uploadEntity.setErrorMessage(errorMessage);
|
||||
uploadEntityStateMap.put(uuid, uploadEntity);
|
||||
}
|
||||
|
||||
FormatInfo info = null;
|
||||
try {
|
||||
info = processor.process(resourcePath, null, templateName);
|
||||
} catch (InternalErrorException e) {
|
||||
s_logger.error("Template process exception ", e);
|
||||
return e.toString();
|
||||
}
|
||||
if (info != null) {
|
||||
loc.addFormat(info);
|
||||
uploadEntity.setVirtualSize(info.virtualSize);
|
||||
//dnld.setTemplatesize(info.virtualSize);
|
||||
//dnld.setTemplatePhysicalSize(info.size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!loc.save()) {
|
||||
s_logger.warn("Cleaning up because we're unable to save the formats");
|
||||
loc.purge();
|
||||
}
|
||||
uploadEntity.setStatus(UploadEntity.Status.COMPLETED);
|
||||
uploadEntityStateMap.put(uploadEntity.getUuid(), uploadEntity);
|
||||
|
||||
return null;
|
||||
public void validatePostUploadRequest(String signature, String metadata, String timeout, String hostname, String uuid) throws InvalidParameterValueException{
|
||||
// check none of the params are empty
|
||||
if(StringUtils.isEmpty(signature) || StringUtils.isEmpty(metadata) || StringUtils.isEmpty(timeout)) {
|
||||
updateStateMapWithError(uuid,"signature, metadata and expires are compulsory fields.");
|
||||
throw new InvalidParameterValueException("signature, metadata and expires are compulsory fields.");
|
||||
}
|
||||
|
||||
private void parsePostBody(ByteArrayInputStream input, OutputStream output, Map<String, String> params) throws IOException {
|
||||
|
||||
byte [] bytebuf = new byte[4096];
|
||||
input.read(bytebuf,0,1024);
|
||||
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytebuf)));
|
||||
int readBytes=0;
|
||||
String boundary =reader.readLine();
|
||||
readBytes+=boundary.length()+2;
|
||||
String currentLine =null;
|
||||
|
||||
while (reader.ready()) {
|
||||
currentLine = reader.readLine();
|
||||
readBytes+=currentLine.length()+2;
|
||||
if (currentLine.contains("Content-Disposition: form-data;")) {
|
||||
String paramName = currentLine;
|
||||
paramName = paramName.replace("Content-Disposition: form-data; name=", "").replace("\"", "");
|
||||
StringBuilder paramValue = new StringBuilder();
|
||||
if (paramName.contains("filename")) {
|
||||
String[] temp = paramName.split(";");
|
||||
paramName = temp[0];
|
||||
paramValue.append(temp[1].replace("filename", "").replace("=", "").replace(" ", ""));
|
||||
} else {
|
||||
currentLine = reader.readLine();
|
||||
readBytes+=currentLine.length()+2;
|
||||
while (!currentLine.contains(boundary)) {
|
||||
paramValue.append(currentLine);
|
||||
currentLine = reader.readLine();
|
||||
readBytes+=currentLine.length()+2;
|
||||
}
|
||||
}
|
||||
params.put(paramName, paramValue.toString());
|
||||
}
|
||||
if (currentLine.contains("Content-Type: ")) {
|
||||
// File Content here
|
||||
reader.readLine();
|
||||
|
||||
readBytes+=2;
|
||||
|
||||
input.reset();
|
||||
input.skip(readBytes);
|
||||
int read=0;
|
||||
while (true) {
|
||||
read=input.read(bytebuf,0,1024);
|
||||
if (read == -1) {
|
||||
break;
|
||||
}
|
||||
readBytes+=read;
|
||||
String stringBuf= new String(bytebuf);
|
||||
if (stringBuf.contains(boundary)) {
|
||||
int i=0;
|
||||
while (!(bytebuf[i] == '\r' && bytebuf[i+1] == '\n')) {
|
||||
output.write(bytebuf[i]);
|
||||
i++;
|
||||
}
|
||||
readBytes = readBytes - read + i;
|
||||
input.reset();
|
||||
input.skip(readBytes);
|
||||
break;
|
||||
}
|
||||
if (stringBuf.contains("-")) {
|
||||
int i=0;
|
||||
while (bytebuf[i] != '-') {
|
||||
output.write(bytebuf[i]);
|
||||
i++;
|
||||
}
|
||||
readBytes = readBytes - read + i;
|
||||
input.reset();
|
||||
input.skip(readBytes);
|
||||
} else {
|
||||
output.write(bytebuf,0,1024);
|
||||
}
|
||||
}
|
||||
Arrays.fill(bytebuf, (byte)0);
|
||||
input.read(bytebuf,0,1024);
|
||||
reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytebuf)));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
//validate signature
|
||||
String fullUrl = "https://" + hostname + "/upload/" + uuid;
|
||||
String computedSignature = EncryptionUtil.generateSignature(metadata + fullUrl + timeout, getPostUploadPSK());
|
||||
boolean isSignatureValid = computedSignature.equals(signature);
|
||||
if(!isSignatureValid) {
|
||||
updateStateMapWithError(uuid,"signature validation failed.");
|
||||
throw new InvalidParameterValueException("signature validation failed.");
|
||||
}
|
||||
|
||||
public void handleInternal(HttpRequest httpRequest, HttpResponse httpResponse, HttpContext httpContext) throws HttpException, IOException {
|
||||
|
||||
HttpEntity entity = null;
|
||||
if (httpRequest instanceof HttpEntityEnclosingRequest) {
|
||||
entity = ((HttpEntityEnclosingRequest) httpRequest).getEntity();
|
||||
}
|
||||
byte[] data;
|
||||
if (entity == null) {
|
||||
httpResponse.setEntity(new StringEntity("upload failed"));
|
||||
httpResponse.setStatusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY);
|
||||
return;
|
||||
} else {
|
||||
data = EntityUtils.toByteArray(entity);
|
||||
}
|
||||
|
||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
HashMap<String, String> params = new HashMap<>();
|
||||
|
||||
parsePostBody(inputStream, output, params);
|
||||
String encodedMetadata = params.get("metadata");
|
||||
String key = FileUtils.readFileToString(new File(POST_UPLOAD_KEY_LOCATION), "utf-8");
|
||||
String metadata = EncryptionUtil.decodeData(encodedMetadata, key);
|
||||
|
||||
Gson gson = new GsonBuilder().create();
|
||||
TemplateOrVolumePostUploadCommand cmd = gson.fromJson(metadata, TemplateOrVolumePostUploadCommand.class);
|
||||
|
||||
//call handle upload method.
|
||||
handleuplod(cmd.getEntityId(),cmd.getAbsolutePath(), cmd.getEntityUUID(),cmd.getLocalPath(),cmd.getName(),output.size(), cmd.getType(), cmd.getImageFormat(), output.toByteArray(), false, cmd.getChecksum(), cmd.getDataTo());
|
||||
|
||||
s_logger.error(new String(data));
|
||||
|
||||
httpResponse.setEntity(new StringEntity("upload successful"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpAsyncRequestConsumer<HttpRequest> processRequest(HttpRequest request, HttpContext context) throws HttpException, IOException {
|
||||
return new BasicAsyncRequestConsumer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpRequest request, HttpAsyncExchange httpExchange, HttpContext context) throws HttpException, IOException {
|
||||
HttpResponse response = httpExchange.getResponse();
|
||||
handleInternal(request, response, context);
|
||||
httpExchange.submitResponse(new BasicAsyncResponseProducer(response));
|
||||
//validate timeout
|
||||
DateTime timeoutDateTime = DateTime.parse(timeout, ISODateTimeFormat.dateTime());
|
||||
if(timeoutDateTime.isBeforeNow()) {
|
||||
updateStateMapWithError(uuid,"request not valid anymore.");
|
||||
throw new InvalidParameterValueException("request not valid anymore.");
|
||||
}
|
||||
}
|
||||
|
||||
private TemplateOrVolumePostUploadCommand getTemplateOrVolumePostUploadCmd(String metadata) {
|
||||
TemplateOrVolumePostUploadCommand cmd = null;
|
||||
try {
|
||||
Gson gson = new GsonBuilder().create();
|
||||
cmd = gson.fromJson(EncryptionUtil.decodeData(metadata, getPostUploadPSK()), TemplateOrVolumePostUploadCommand.class);
|
||||
} catch(Exception ex) {
|
||||
s_logger.error("exception while decoding and deserialising metadata", ex);
|
||||
}
|
||||
return cmd;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package org.apache.cloudstack.storage.template;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import com.cloud.storage.template.Processor;
|
||||
import org.apache.cloudstack.storage.command.DownloadCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadCommand.ResourceType;
|
||||
import org.apache.cloudstack.storage.resource.SecondaryStorageResource;
|
||||
|
|
@ -52,7 +53,7 @@ public interface DownloadManager extends Manager {
|
|||
public String downloadS3Template(S3TO s3, long id, String url, String name, ImageFormat format, boolean hvm, Long accountId, String descr, String cksum,
|
||||
String installPathPrefix, String user, String password, long maxTemplateSizeInBytes, Proxy proxy, ResourceType resourceType);
|
||||
|
||||
Map getProcessesors();
|
||||
Map<String, Processor> getProcessors();
|
||||
|
||||
/**
|
||||
* Get the status of a download job
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ public class DownloadManagerImpl extends ManagerBase implements DownloadManager
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map getProcessesors() {
|
||||
public Map<String, Processor> getProcessors() {
|
||||
return _processors;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,11 +19,8 @@ package org.apache.cloudstack.storage.template;
|
|||
|
||||
|
||||
import com.cloud.storage.Storage;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
||||
public class UploadEntity {
|
||||
private long filesize;
|
||||
private long downloadedsize;
|
||||
private String filename;
|
||||
private String installPathPrefix;
|
||||
|
|
@ -33,8 +30,7 @@ public class UploadEntity {
|
|||
private String uuid;
|
||||
private long entityId;
|
||||
private String chksum;
|
||||
|
||||
|
||||
private long physicalSize;
|
||||
|
||||
public static enum ResourceType {
|
||||
VOLUME, TEMPLATE
|
||||
|
|
@ -45,17 +41,15 @@ public class UploadEntity {
|
|||
}
|
||||
|
||||
private Status uploadState;
|
||||
private FileOutputStream filewriter = null;
|
||||
private String errorMessage=null;
|
||||
private File file;
|
||||
private ResourceType resourceType;
|
||||
private long virtualSize;
|
||||
private boolean isMetaDataPopulated;
|
||||
|
||||
public static long s_maxTemplateSize = 50L * 1024L * 1024L * 1024L;
|
||||
|
||||
public UploadEntity(String uuid,long entityId,long filesize, Status status, String filename, String installPathPrefix){
|
||||
public UploadEntity(String uuid, long entityId, Status status, String filename, String installPathPrefix){
|
||||
this.uuid=uuid;
|
||||
this.filesize=filesize;
|
||||
this.uploadState=status;
|
||||
this.downloadedsize=0l;
|
||||
this.filename=filename;
|
||||
|
|
@ -63,8 +57,8 @@ public class UploadEntity {
|
|||
this.entityId=entityId;
|
||||
}
|
||||
|
||||
public void setEntitysize(long filesize) {
|
||||
this.filesize=filesize;
|
||||
public UploadEntity(){
|
||||
|
||||
}
|
||||
|
||||
public void setStatus(Status status) {
|
||||
|
|
@ -75,14 +69,6 @@ public class UploadEntity {
|
|||
this.errorMessage=errorMessage;
|
||||
}
|
||||
|
||||
public FileOutputStream getFilewriter() {
|
||||
return filewriter;
|
||||
}
|
||||
|
||||
public long getEntitysize() {
|
||||
return filesize;
|
||||
}
|
||||
|
||||
public long getDownloadedsize() {
|
||||
return downloadedsize;
|
||||
}
|
||||
|
|
@ -95,22 +81,10 @@ public class UploadEntity {
|
|||
return uploadState;
|
||||
}
|
||||
|
||||
public void setFilewriter(FileOutputStream filewriter) {
|
||||
this.filewriter = filewriter;
|
||||
}
|
||||
|
||||
public void incremetByteCount(long numberOfBytes) {
|
||||
this.downloadedsize+= numberOfBytes;
|
||||
}
|
||||
|
||||
public File getFile() {
|
||||
return file;
|
||||
}
|
||||
|
||||
public void setFile(File file) {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
public String getFilename() {
|
||||
return filename;
|
||||
}
|
||||
|
|
@ -181,4 +155,22 @@ public class UploadEntity {
|
|||
public void setVirtualSize(long virtualSize) {
|
||||
this.virtualSize = virtualSize;
|
||||
}
|
||||
|
||||
public boolean isMetaDataPopulated() {
|
||||
return isMetaDataPopulated;
|
||||
}
|
||||
|
||||
public void setMetaDataPopulated(boolean isMetaDataPopulated) {
|
||||
this.isMetaDataPopulated = isMetaDataPopulated;
|
||||
}
|
||||
|
||||
public void setPhysicalSize(long physicalSize) {
|
||||
this.physicalSize = physicalSize;
|
||||
}
|
||||
|
||||
public long getPhysicalSize() {
|
||||
return physicalSize;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue