mirror of https://github.com/apache/cloudstack.git
560 lines
19 KiB
Java
560 lines
19 KiB
Java
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
/* Took the basic code from Axis 1.2 and modified to fit into the cloud code base */
|
|
|
|
package com.cloud.bridge.io;
|
|
|
|
import java.io.FilterInputStream;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
/**
|
|
* This class takes the input stream and turns it multiple streams.
|
|
DIME version 0 format
|
|
<pre>
|
|
0 1 2 3
|
|
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ---
|
|
| VERSION |B|E|C| TYPE_T| OPT_T | OPTIONS_LENGTH | A
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
| ID_LENGTH | TYPE_LENGTH | Always present 12 bytes
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ even on chunked data.
|
|
| DATA_LENGTH | V
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ---
|
|
| /
|
|
/ OPTIONS + PADDING /
|
|
/ (absent for version 0) |
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
| /
|
|
/ ID + PADDING /
|
|
/ |
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
| /
|
|
/ TYPE + PADDING /
|
|
/ |
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
| /
|
|
/ DATA + PADDING /
|
|
/ |
|
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
</pre>
|
|
* This implementation of input stream does not support marking operations.
|
|
*
|
|
* Incoming data is DIME encoded when its MIME type is "application/dime".
|
|
* Then use this class to pull out 2 streams:
|
|
* (1) The first stream is the SOAP request,
|
|
* (2) The second stream is a chunked attachment (e.g., a file to store)
|
|
*
|
|
* The DIME format is defined at this reference:
|
|
* http://msdn.microsoft.com/en-us/library/aa480488.aspx
|
|
*
|
|
* @author Rick Rineholt
|
|
*/
|
|
public class DimeDelimitedInputStream extends FilterInputStream {
|
|
protected final static Logger logger = Logger.getLogger(DimeDelimitedInputStream.class);
|
|
|
|
InputStream is = null; //The source input stream.
|
|
boolean closed = true; //The stream has been closed.
|
|
boolean theEnd = false; //There are no more streams left.
|
|
boolean moreChunks = false; //More chunks are a coming!
|
|
boolean MB = false; //Message begin flag
|
|
boolean ME = false; //Message end flag
|
|
String type = null; //
|
|
String id = null; //
|
|
String tnf = null; //DIME type format
|
|
long recordLength = 0L; //length of the current record.
|
|
long bytesRead = 0L; //How many bytes of the current record have been read.
|
|
int dataPadLength = 0; //How many pad bytes there are.
|
|
|
|
protected int streamNo = 0;
|
|
protected IOException streamInError = null;
|
|
|
|
private static byte[] trash = new byte[4];
|
|
protected static int streamCount = 0; //number of streams produced.
|
|
|
|
protected static synchronized int newStreamNo() {
|
|
logger.debug("streamNo " + (streamCount + 1));
|
|
return ++streamCount;
|
|
}
|
|
|
|
/**
|
|
* There can be multiple streams in a DIME encoding. For example, the first
|
|
* stream can be a SOAP message, and the second stream a binary attachment (e.g.,
|
|
* a file). During reading after an EOF is returned, this function should be
|
|
* called to see if there is another stream following the last.
|
|
*
|
|
* @return the dime delimited stream, null if there are no more streams
|
|
* @throws IOException if there was an error loading the data for the next stream
|
|
*/
|
|
public synchronized DimeDelimitedInputStream getNextStream() throws IOException {
|
|
if (null != streamInError)
|
|
throw streamInError;
|
|
if (theEnd)
|
|
return null;
|
|
|
|
//Each Stream must be read in succession
|
|
if (bytesRead < recordLength || moreChunks)
|
|
throw new RuntimeException("attach.dimeReadFullyError");
|
|
|
|
dataPadLength -= readPad(dataPadLength);
|
|
|
|
//Create an new dime stream that comes after this one.
|
|
return new DimeDelimitedInputStream(this.is);
|
|
}
|
|
|
|
/**
|
|
* Create a new dime stream.
|
|
*
|
|
* @param is the <code>InputStream</code> to wrap
|
|
* @throws IOException if anything goes wrong
|
|
*/
|
|
public DimeDelimitedInputStream(InputStream is) throws IOException {
|
|
super(null);
|
|
streamNo = newStreamNo();
|
|
closed = false;
|
|
this.is = is;
|
|
readHeader(false);
|
|
}
|
|
|
|
/**
|
|
* Make sure to skip the pad which appear in several parts of a DIME message.
|
|
* @param size
|
|
* @return
|
|
* @throws IOException
|
|
*/
|
|
private final int readPad(int size) throws IOException {
|
|
if (0 == size)
|
|
return 0;
|
|
int read = readFromStream(trash, 0, size);
|
|
|
|
if (size != read) {
|
|
streamInError = new IOException("attach.dimeNotPaddedCorrectly");
|
|
throw streamInError;
|
|
}
|
|
return read;
|
|
}
|
|
|
|
private final int readFromStream(byte[] b) throws IOException {
|
|
return readFromStream(b, 0, b.length);
|
|
}
|
|
|
|
private final int readFromStream(byte[] b, int start, int length) throws IOException {
|
|
int br = 0;
|
|
int brTotal = 0;
|
|
|
|
if (length == 0)
|
|
return 0;
|
|
|
|
do {
|
|
try {
|
|
br = is.read(b, brTotal + start, length - brTotal);
|
|
} catch (IOException e) {
|
|
streamInError = e;
|
|
throw e;
|
|
}
|
|
if (br > 0)
|
|
brTotal += br;
|
|
} while (br > -1 && brTotal < length);
|
|
|
|
return br > -1 ? brTotal : br;
|
|
}
|
|
|
|
/**
|
|
* Get the id for this stream part.
|
|
* @return the id;
|
|
*/
|
|
public String getContentId() {
|
|
return id;
|
|
}
|
|
|
|
public String getDimeTypeNameFormat() {
|
|
return tnf;
|
|
}
|
|
|
|
/**
|
|
* Get the type, as read from the header.
|
|
* @return the type of this dime
|
|
*/
|
|
public String getType() {
|
|
return type;
|
|
}
|
|
|
|
/**
|
|
* Read from the DIME stream.
|
|
*
|
|
* @param b is the array to read into.
|
|
* @param off is the offset
|
|
* @return the number of bytes read. -1 if endof stream
|
|
* @throws IOException if data could not be read from the stream
|
|
*/
|
|
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
|
if (closed) {
|
|
dataPadLength -= readPad(dataPadLength);
|
|
throw new IOException("streamClosed");
|
|
}
|
|
return _read(b, off, len);
|
|
}
|
|
|
|
protected int _read(byte[] b, int off, int len) throws IOException {
|
|
int totalbytesread = 0;
|
|
int bytes2read = 0;
|
|
|
|
if (len < 0)
|
|
throw new IllegalArgumentException("attach.readLengthError" + len);
|
|
|
|
if (off < 0)
|
|
throw new IllegalArgumentException("attach.readOffsetError" + off);
|
|
|
|
if (b == null)
|
|
throw new IllegalArgumentException("attach.readArrayNullError");
|
|
|
|
if (b.length < off + len)
|
|
throw new IllegalArgumentException("attach.readArraySizeError " + b.length + " " + len + " " + off);
|
|
|
|
if (null != streamInError)
|
|
throw streamInError;
|
|
if (0 == len)
|
|
return 0; //quick.
|
|
|
|
// odd case no data to read -- give back 0 next time -1;
|
|
if (recordLength == 0 && bytesRead == 0 && !moreChunks) {
|
|
++bytesRead;
|
|
if (ME)
|
|
finalClose();
|
|
return 0;
|
|
}
|
|
if (bytesRead >= recordLength && !moreChunks) {
|
|
dataPadLength -= readPad(dataPadLength);
|
|
if (ME)
|
|
finalClose();
|
|
return -1;
|
|
}
|
|
|
|
do {
|
|
if (bytesRead >= recordLength && moreChunks)
|
|
readHeader(true);
|
|
bytes2read = (int)Math.min(recordLength - bytesRead, (long)len - totalbytesread);
|
|
|
|
try {
|
|
bytes2read = is.read(b, off + totalbytesread, bytes2read);
|
|
} catch (IOException e) {
|
|
streamInError = e;
|
|
throw e;
|
|
}
|
|
|
|
if (0 < bytes2read) {
|
|
totalbytesread += bytes2read;
|
|
bytesRead += bytes2read;
|
|
}
|
|
} while (bytes2read > -1 && totalbytesread < len && (bytesRead < recordLength || moreChunks));
|
|
|
|
if (0 > bytes2read) {
|
|
if (moreChunks) {
|
|
streamInError = new IOException("attach.DimeStreamError0");
|
|
throw streamInError;
|
|
}
|
|
if (bytesRead < recordLength) {
|
|
streamInError = new IOException("attach.DimeStreamError1 " + (recordLength - bytesRead));
|
|
throw streamInError;
|
|
}
|
|
if (!ME) {
|
|
streamInError = new IOException("attach.DimeStreamError0");
|
|
throw streamInError;
|
|
}
|
|
//in theory the last chunk of data should also have been padded, but lets be tolerant of that.
|
|
dataPadLength = 0;
|
|
} else if (bytesRead >= recordLength) {
|
|
//get rid of pading.
|
|
try {
|
|
dataPadLength -= readPad(dataPadLength);
|
|
} catch (IOException e) {
|
|
//in theory the last chunk of data should also have been padded, but lets be tolerant of that.
|
|
if (!ME)
|
|
throw e;
|
|
else {
|
|
dataPadLength = 0;
|
|
streamInError = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (bytesRead >= recordLength && ME)
|
|
finalClose();
|
|
|
|
return totalbytesread >= 0 ? totalbytesread : -1;
|
|
}
|
|
|
|
/**
|
|
* The DIME header is read into local class data fields and are not
|
|
* passed as part of the stream data.
|
|
*
|
|
* @param isChunk
|
|
* @throws IOException
|
|
*/
|
|
protected void readHeader(boolean isChunk) throws IOException {
|
|
bytesRead = 0; //How many bytes of the record have been read.
|
|
|
|
if (isChunk) {
|
|
if (!moreChunks)
|
|
throw new RuntimeException("attach.DimeStreamError2");
|
|
dataPadLength -= readPad(dataPadLength); //Just in case it was left over.
|
|
}
|
|
|
|
byte[] header = new byte[12];
|
|
|
|
if (header.length != readFromStream(header)) {
|
|
streamInError = new IOException("attach.DimeStreamError3 " + header.length);
|
|
throw streamInError;
|
|
}
|
|
|
|
//VERSION
|
|
byte version = (byte)((header[0] >>> 3) & 0x1f);
|
|
if (version > 1) {
|
|
streamInError = new IOException("attach.DimeStreamError4 " + version);
|
|
throw streamInError;
|
|
}
|
|
|
|
//B, E, C
|
|
MB = 0 != (0x4 & header[0]);
|
|
ME = 0 != (0x2 & header[0]);
|
|
moreChunks = 0 != (0x1 & header[0]);
|
|
|
|
//TYPE_T
|
|
if (!isChunk) {
|
|
switch (((header[1] >>> 4) & (byte)0x0f)) {
|
|
case 0x00:
|
|
tnf = "UNCHANGED";
|
|
break;
|
|
case 0x01:
|
|
tnf = "MIME";
|
|
break;
|
|
case 0x02:
|
|
tnf = "URI";
|
|
break;
|
|
default:
|
|
tnf = "UNKNOWN";
|
|
break;
|
|
}
|
|
}
|
|
|
|
//OPTIONS_LENGTH
|
|
int optionsLength = ((((int)header[2]) << 8) & 0xff00) | ((int)header[3]);
|
|
|
|
//ID_LENGTH
|
|
int idLength = ((((int)header[4]) << 8) & 0xff00) | ((int)header[5]);
|
|
|
|
//TYPE_LENGTH
|
|
int typeLength = ((((int)header[6]) << 8) & 0xff00) | ((int)header[7]);
|
|
|
|
//DATA_LENGTH
|
|
recordLength =
|
|
((((long)header[8]) << 24) & 0xff000000L) | ((((long)header[9]) << 16) & 0xff0000L) | ((((long)header[10]) << 8) & 0xff00L) | ((long)header[11] & 0xffL);
|
|
|
|
//OPTIONS + PADDING
|
|
if (0 != optionsLength) {
|
|
byte[] optBytes = new byte[optionsLength];
|
|
|
|
if (optionsLength != readFromStream(optBytes)) {
|
|
streamInError = new IOException("attach.DimeStreamError5 " + optionsLength);
|
|
throw streamInError;
|
|
}
|
|
optBytes = null; // throw it away, don't know anything about options.
|
|
|
|
int pad = (int)((4L - (optionsLength & 0x3L)) & 0x03L);
|
|
|
|
if (pad != readFromStream(header, 0, pad)) {
|
|
streamInError = new IOException("attach.DimeStreamError7");
|
|
throw streamInError;
|
|
}
|
|
}
|
|
|
|
// ID + PADDING
|
|
if (0 < idLength) {
|
|
byte[] idBytes = new byte[idLength];
|
|
|
|
if (idLength != readFromStream(idBytes)) {
|
|
streamInError = new IOException("attach.DimeStreamError8");
|
|
throw streamInError;
|
|
}
|
|
if (idLength != 0 && !isChunk)
|
|
id = new String(idBytes);
|
|
|
|
int pad = (int)((4L - (idLength & 0x3L)) & 0x03L);
|
|
|
|
if (pad != readFromStream(header, 0, pad)) {
|
|
streamInError = new IOException("attach.DimeStreamError9");
|
|
throw streamInError;
|
|
}
|
|
}
|
|
|
|
//TYPE + PADDING
|
|
if (0 < typeLength) {
|
|
byte[] typeBytes = new byte[typeLength];
|
|
|
|
if (typeLength != readFromStream(typeBytes)) {
|
|
streamInError = new IOException("attach.DimeStreamError10");
|
|
throw streamInError;
|
|
}
|
|
if (typeLength != 0 && !isChunk)
|
|
type = new String(typeBytes);
|
|
|
|
int pad = (int)((4L - (typeLength & 0x3L)) & 0x03L);
|
|
|
|
if (pad != readFromStream(header, 0, pad)) {
|
|
streamInError = new IOException("attach.DimeStreamError11");
|
|
throw streamInError;
|
|
}
|
|
}
|
|
logger.debug("MB:" + MB + ", ME:" + ME + ", CF:" + moreChunks + "Option length:" + optionsLength + ", ID length:" + idLength + ", typeLength:" + typeLength +
|
|
", TYPE_T:" + tnf);
|
|
logger.debug("id:\"" + id + "\"");
|
|
logger.debug("type:\"" + type + "\"");
|
|
logger.debug("recordlength:\"" + recordLength + "\"");
|
|
|
|
dataPadLength = (int)((4L - (recordLength & 0x3L)) & 0x03L);
|
|
}
|
|
|
|
/**
|
|
* Read from the delimited stream.
|
|
*
|
|
* @param b is the array to read into. Read as much as possible
|
|
* into the size of this array.
|
|
* @return the number of bytes read. -1 if endof stream
|
|
* @throws IOException if data could not be read from the stream
|
|
*/
|
|
public int read(byte[] b) throws IOException {
|
|
return read(b, 0, b.length);
|
|
}
|
|
|
|
// fixme: this seems a bit inefficient
|
|
/**
|
|
* Read from the boundary delimited stream.
|
|
*
|
|
* @return the byte read, or -1 if endof stream
|
|
* @throws IOException if there was an error reading the data
|
|
*/
|
|
public int read() throws IOException {
|
|
byte[] b = new byte[1];
|
|
int read = read(b, 0, 1);
|
|
|
|
if (read < 0)
|
|
return -1; // fixme: should we also check for read != 1?
|
|
return (b[0] & 0xff); // convert byte value to a positive int
|
|
}
|
|
|
|
/**
|
|
* Closes the stream.
|
|
* This will take care of flushing any remaining data to the stream.
|
|
* Multiple calls to this method will result in the stream being closed once
|
|
* and then all subsequent calls being ignored.
|
|
*
|
|
* @throws IOException if the stream could not be closed
|
|
*/
|
|
public void close() throws IOException {
|
|
synchronized (this) {
|
|
if (closed)
|
|
return;
|
|
closed = true; //mark it closed.
|
|
}
|
|
logger.debug("bStreamClosed " + streamNo);
|
|
|
|
if (bytesRead < recordLength || moreChunks) {
|
|
//We need get this off the stream. Easy way to flush through the stream;
|
|
byte[] readrest = new byte[1024 * 16];
|
|
int bread = 0;
|
|
|
|
do {
|
|
bread = _read(readrest, 0, readrest.length); //should also close the original stream.
|
|
} while (bread > -1);
|
|
}
|
|
dataPadLength -= readPad(dataPadLength);
|
|
}
|
|
|
|
/**
|
|
* Skip n bytes of data in the DIME stream, while reading and processing
|
|
* any headers in the current stream.
|
|
*
|
|
* @param n - number of data bytes to skip
|
|
* @return number of bytes actually skipped
|
|
* @throws IOException
|
|
*/
|
|
public long skip(long n) throws IOException {
|
|
long bytesSkipped = 0;
|
|
long bytes2Read = 0;
|
|
byte[] dumpbytes = new byte[1024];
|
|
|
|
while (n > 0) {
|
|
bytes2Read = (n > 1024 ? 1024 : n);
|
|
bytes2Read = _read(dumpbytes, 0, (int)bytes2Read);
|
|
|
|
n -= bytes2Read;
|
|
bytesSkipped += bytes2Read;
|
|
}
|
|
|
|
return bytesSkipped;
|
|
}
|
|
|
|
/**
|
|
* Mark the stream. This is not supported.
|
|
*/
|
|
public void mark(int readlimit) { //do nothing
|
|
}
|
|
|
|
public void reset() throws IOException {
|
|
streamInError = new IOException("attach.bounday.mns");
|
|
throw streamInError;
|
|
}
|
|
|
|
public boolean markSupported() {
|
|
return false;
|
|
}
|
|
|
|
public synchronized int available() throws IOException {
|
|
if (null != streamInError)
|
|
throw streamInError;
|
|
|
|
int chunkAvail = (int)Math.min((long)Integer.MAX_VALUE, recordLength - bytesRead);
|
|
int streamAvail = 0;
|
|
|
|
try {
|
|
streamAvail = is.available();
|
|
} catch (IOException e) {
|
|
streamInError = e;
|
|
throw e;
|
|
}
|
|
|
|
if (chunkAvail == 0 && moreChunks && (12 + dataPadLength) <= streamAvail) {
|
|
dataPadLength -= readPad(dataPadLength);
|
|
readHeader(true);
|
|
return available();
|
|
}
|
|
return Math.min(streamAvail, chunkAvail);
|
|
}
|
|
|
|
protected void finalClose() throws IOException {
|
|
try {
|
|
theEnd = true;
|
|
if (null != is)
|
|
is.close();
|
|
} finally {
|
|
is = null;
|
|
}
|
|
}
|
|
}
|