getPads(Direction direction);
+
+ /**
+ * Connect link to given pad.
+ *
+ * @param padName
+ * a pad name. Standard pads are "stdin" and "stdout".
+ */
+ void setLink(String padName, Link link, Direction direction);
+
+ /**
+ * Disconnect link from given pad.
+ *
+ * @param padName
+ * Standard pads are "stdin" and "stdout".
+ */
+ void dropLink(String padName);
+
+ /**
+ * Pull data from element and handle it. Element should ask one of it input
+ * pads for data, handle data and push result to it sink(s), if any.
+ *
+ * @param block
+ * block until data will be available, or do a slight delay at least,
+ * when data is not available
+ */
+ void poll(boolean block);
+
+ /**
+ * Handle incoming data.
+ *
+ * @param buf
+ * a data
+ * @param link
+ * TODO
+ */
+ void handleData(ByteBuffer buf, Link link);
+
+ /**
+ * Handle event.
+ *
+ * @param event
+ * an event
+ * @param direction
+ * if IN, then send event to input pads, when OUT, then send to
+ * output pads
+ */
+ void handleEvent(Event event, Direction direction);
+
+ /**
+ * Get element ID.
+ */
+ String getId();
+
+ /**
+ * Validate element: check is all required pads are connected.
+ */
+ void validate();
+
+ /**
+ * Drop link.
+ *
+ * @param link a link to drop
+ */
+ void dropLink(Link link);
+
+ /**
+ * Drop existing link and replace it by new link.
+ */
+ void replaceLink(Link existingLink, Link newLink);
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
new file mode 100644
index 00000000000..5e1a3893c52
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Event.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public enum Event {
+ STREAM_START,
+ STREAM_CLOSE,
+
+ /**
+ * Upgrade socket to SSL.
+ */
+ SOCKET_UPGRADE_TO_SSL,
+
+ /**
+ * Switch links to input mode.
+ */
+ LINK_SWITCH_TO_PULL_MODE
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
new file mode 100644
index 00000000000..65fb29e3f0f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSink.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class FakeSink extends BaseElement {
+
+ public FakeSink(String id) {
+ super(id);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+ if (buf == null)
+ return;
+
+ // Use packetNumber variable to count incoming packets
+ packetNumber++;
+
+ buf.unref();
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSink(" + id + ")";
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element sink = new FakeSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ byte[] data = new byte[] { 1, 2, 3 };
+ ByteBuffer buf = new ByteBuffer(data);
+ sink.setLink(STDIN, new SyncLink(), Direction.IN);
+ sink.getLink(STDIN).sendData(buf);
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
new file mode 100644
index 00000000000..4cf65034a73
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/FakeSource.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class FakeSource extends BaseElement {
+
+ /**
+ * Delay for null packets in poll method when blocking is requested, in
+ * milliseconds.
+ */
+ protected long delay = SyncLink.STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+ public FakeSource(String id) {
+ super(id);
+ }
+
+ @Override
+ public void poll(boolean block) {
+ if (numBuffers > 0 && packetNumber >= numBuffers) {
+ // Close stream when limit of packets is reached
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ // Prepare new packet
+ ByteBuffer buf = initializeData();
+
+ // Push it to output(s)
+ pushDataToAllOuts(buf);
+
+ // Make slight delay when blocking input was requested (to avoid
+ // consuming of 100% in parent loop)
+ if (block)
+ delay();
+
+ }
+
+ /**
+ * Make slight delay. Should be used when blocking input is requested in pull
+ * mode, but null packed was returned by input.
+ */
+ protected void delay() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * Initialize data.
+ */
+ public ByteBuffer initializeData() {
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ // Set first byte of package to it sequance number
+ buf.data[buf.offset] = (byte) (packetNumber % 128);
+
+ // Initialize rest of bytes with sequential values, which are
+ // corresponding with their position in byte buffer
+ for (int i = buf.offset + 1; i < buf.length; i++)
+ buf.data[i] = (byte) (i % 128);
+
+ buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+ buf.putMetadata("src", id);
+
+ return buf;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSource(" + id + ")";
+ }
+
+ public static void main(String args[]) {
+
+ Element fakeSource = new FakeSource("source 3/10/100") {
+ {
+ verbose = true;
+ this.incommingBufLength = 3;
+ this.numBuffers = 10;
+ this.delay = 100;
+ }
+ };
+
+ Element fakeSink = new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Element fakeSink2 = new FakeSink("sink2") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ fakeSource.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ Link link2 = new SyncLink();
+
+ fakeSource.setLink("out2", link2, Direction.OUT);
+ fakeSink2.setLink(STDIN, link2, Direction.IN);
+
+ link.run();
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
new file mode 100644
index 00000000000..b05637f0db0
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/InputStreamSource.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Source element, which reads data from InputStream.
+ */
+public class InputStreamSource extends BaseElement {
+
+ protected InputStream is;
+ protected SocketWrapper socketWrapper;
+
+ public InputStreamSource(String id) {
+ super(id);
+ }
+
+ public InputStreamSource(String id, InputStream is) {
+ super(id);
+ this.is = is;
+ }
+
+ public InputStreamSource(String id, SocketWrapper socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case OUT:
+ super.setLink(padName, link, direction);
+
+ if (is == null) {
+ // Pause links until data stream will be ready
+ link.pause();
+ }
+ break;
+ case IN:
+ throw new RuntimeException("Cannot assign link to input pad in source element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ public void setInputStream(InputStream is) {
+ this.is = is;
+
+ // Resume links
+ resumeLinks();
+ }
+
+ private void resumeLinks() {
+ for (DataSink sink : outputPads.values())
+ ((Link) sink).resume();
+ }
+
+ /**
+ * Read data from input stream.
+ */
+ @Override
+ public void poll(boolean block) {
+ try {
+ if (!block && is.available() == 0) {
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: No data in stream is available now, returning.");
+
+ return;
+ }
+
+ // Create buffer of recommended size and with default offset
+ ByteBuffer buf = new ByteBuffer(incommingBufLength);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Reading data from stream.");
+
+ int actualLength = is.read(buf.data, buf.offset, buf.data.length - buf.offset);
+
+ if (actualLength < 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: End of stream.");
+
+ buf.unref();
+ closeStream();
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return;
+ }
+
+ if (actualLength == 0) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Empty buffer is read from stream.");
+
+ buf.unref();
+ return;
+ }
+
+ buf.length = actualLength;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data read from stream: " + buf + ".");
+
+ pushDataToAllOuts(buf);
+
+ } catch (IOException e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InputStreamSource(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ InputStream is = new ByteArrayInputStream(new byte[] { 1, 2, 3 });
+
+ InputStreamSource source = new InputStreamSource("source") {
+ {
+ verbose = true;
+ }
+ };
+ Element fakeSink = new FakeSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ Link link = new SyncLink() {
+ {
+ verbose = true;
+ }
+ };
+
+ source.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ source.setInputStream(is);
+
+ link.sendEvent(Event.STREAM_START, Direction.OUT);
+ link.run();
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
new file mode 100644
index 00000000000..bd970f08afd
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Link.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Link is wire between two elements. It always must contain source and sink
+ * elements.
+ */
+public interface Link extends DataSource, DataSink, Runnable {
+
+ /**
+ * Wire this link with given sink.
+ *
+ * @param sink
+ * an Element
+ * @return same sink element, for chaining
+ */
+ Element setSink(Element sink);
+
+ /**
+ * Wire this link with given source.
+ *
+ * @param source
+ * an Element
+ * @return same source element, for chaining
+ */
+ Element setSource(Element source);
+
+ Element getSource();
+
+ Element getSink();
+
+ /**
+ * Hold all data in cache, don't pass data to sink until resumed.
+ */
+ void pause();
+
+ /**
+ * Resume transfer.
+ */
+ void resume();
+
+ /**
+ * Change mode of operation of this link from push mode to pull mode.
+ */
+ void setPullMode();
+
+ /**
+ * Drop this link.
+ */
+ void drop();
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
new file mode 100644
index 00000000000..ce9fdf91078
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSink.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.Arrays;
+
+/**
+ * Compare incoming packets with expected packets.
+ */
+public class MockSink extends BaseElement {
+
+ protected ByteBuffer bufs[] = null;
+
+ public MockSink(String id) {
+ super(id);
+ }
+
+ public MockSink(String id, ByteBuffer bufs[]) {
+ super(id);
+ this.bufs = bufs;
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Received buf #" + (packetNumber) + " " + buf + ".");
+
+ if (buf == null)
+ return;
+
+ if (packetNumber >= bufs.length)
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not expected. Number of expected buffers: " + bufs.length
+ + ", unexpected buffer: " + buf + ".");
+
+ // Compare incoming buffer with expected buffer
+ if (!Arrays.equals(bufs[packetNumber].toByteArray(), buf.toByteArray()))
+ throw new AssertionError("[" + this + "] Incoming buffer #" + packetNumber + " is not equal to expected buffer.\n Actual bufer: " + buf
+ + ",\n expected buffer: " + bufs[packetNumber] + ".");
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: buffers are equal.");
+
+ // Use packetNumber variable to count incoming packets
+ packetNumber++;
+
+ buf.unref();
+ }
+
+ @Override
+ protected void onClose() {
+ super.onClose();
+
+ if (packetNumber != bufs.length)
+ throw new AssertionError("[" + this + "] Number of expected buffers: " + bufs.length + ", number of actual buffers: " + packetNumber + ".");
+ }
+
+ @Override
+ public String toString() {
+ return "MockSink(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element mockSource = new MockSource("source") {
+ {
+ this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+ new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+ this.verbose = true;
+ this.delay = 100;
+ this.numBuffers = this.bufs.length;
+ }
+ };
+
+ Element mockSink = new MockSink("sink") {
+ {
+ this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+ new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink() {
+ {
+ this.verbose = true;
+ }
+ };
+
+ mockSource.setLink(STDOUT, link, Direction.OUT);
+ mockSink.setLink(STDIN, link, Direction.IN);
+
+ link.run();
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
new file mode 100644
index 00000000000..db47db2ca50
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/MockSource.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class MockSource extends FakeSource {
+
+ protected ByteBuffer bufs[] = null;
+
+ public MockSource(String id) {
+ super(id);
+ }
+
+ public MockSource(String id, ByteBuffer bufs[]) {
+ super(id);
+ this.bufs = bufs;
+ }
+
+ /**
+ * Initialize data.
+ */
+ @Override
+ public ByteBuffer initializeData() {
+ if (packetNumber >= bufs.length) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ return null;
+ }
+
+ ByteBuffer buf = bufs[packetNumber];
+
+ buf.putMetadata(ByteBuffer.SEQUENCE_NUMBER, packetNumber);
+ return buf;
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event received: " + event + ".");
+
+ }
+
+ @Override
+ public String toString() {
+ return "MockSource(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+
+ Element mockSource = new MockSource("source") {
+ {
+ this.bufs = new ByteBuffer[] { new ByteBuffer(new byte[] { 1, 1, 2, 3, 4, 5 }), new ByteBuffer(new byte[] { 2, 1, 2, 3, 4 }),
+ new ByteBuffer(new byte[] { 3, 1, 2, 3 }), new ByteBuffer(new byte[] { 4, 1, 2 }), new ByteBuffer(new byte[] { 5, 1 }) };
+ this.verbose = true;
+ this.delay = 100;
+ // this.numBuffers = this.bufs.length;
+ }
+ };
+
+ Element fakeSink = new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ mockSource.setLink(STDOUT, link, Direction.OUT);
+ fakeSink.setLink(STDIN, link, Direction.IN);
+
+ link.run();
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
new file mode 100644
index 00000000000..a7d48482621
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OneTimeSwitch.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * One time switch for handshake and initialization stages.
+ *
+ * At beginning, element handles data internally, sending output to "otout" pad.
+ * After switchOff() method is called, element drops its links, so packets from
+ * "stdin" pad are forwarded directly to "stdout" pad, without processing.
+ *
+ * Event STREAM_START is captured by this element and not propagated further.
+ * When switchOff() method is called, event STREAM_START is generated and sent
+ * to "stdout".
+ */
+public abstract class OneTimeSwitch extends BaseElement {
+
+ /**
+ * One-time out - name of output pad for one time logic. By default, output
+ * directly to socket.
+ */
+ public static final String OTOUT = "otout";
+
+ private boolean switched = false;
+
+ public OneTimeSwitch(String id) {
+ super(id);
+ declarePads();
+ }
+
+ protected void declarePads() {
+ inputPads.put(STDIN, null);
+ outputPads.put(OTOUT, null);
+ outputPads.put(STDOUT, null);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (switched)
+ throw new RuntimeException(this + " element is switched off and must not receive any data or events anymore.");
+
+ if (buf == null)
+ return;
+
+ handleOneTimeData(buf, link);
+ }
+
+ public void pushDataToOTOut(ByteBuffer buf) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Sending data: " + buf + ".");
+
+ outputPads.get(OTOUT).sendData(buf);
+ }
+
+ /**
+ * Switch this element off. Pass data directly to main output(s).
+ */
+ public void switchOff() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching OFF.");
+
+ switched = true;
+ verbose = false;
+
+ // Rewire links: drop otout link, replace stdout link by stdin to send data
+ // directly to stdout
+ Link stdout = (Link) outputPads.get(STDOUT);
+ Link stdin = (Link) inputPads.get(STDIN);
+ Link otout = (Link) outputPads.get(OTOUT);
+
+ otout.drop();
+
+ // Wake up next peer(s)
+ sendEventToAllPads(Event.STREAM_START, Direction.OUT);
+
+ stdin.setSink(null);
+ inputPads.remove(STDIN);
+
+ Element nextPeer = stdout.getSink();
+ nextPeer.replaceLink(stdout, stdin);
+ stdout.drop();
+
+ for (Object link : inputPads.values().toArray())
+ ((Link) link).drop();
+ for (Object link : outputPads.values().toArray())
+ ((Link) link).drop();
+
+ }
+
+ public void switchOn() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching ON.");
+
+ switched = false;
+ }
+
+ /**
+ * Override this method to handle one-time packet(s) at handshake or
+ * initialization stages. Execute method @see switchRoute() when this method
+ * is no longer necessary.
+ */
+ protected abstract void handleOneTimeData(ByteBuffer buf, Link link);
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ if (event == Event.STREAM_START) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+ switchOn();
+
+ // Execute this element onStart(), but do not propagate event further,
+ // to not wake up next elements too early
+ onStart();
+ } else
+ super.handleEvent(event, direction);
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
new file mode 100644
index 00000000000..1d63850d9ca
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Order.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+public class Order {
+
+ public Object type;
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
new file mode 100644
index 00000000000..d1aa5ce4aca
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/OutputStreamSink.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class OutputStreamSink extends BaseElement {
+
+ protected OutputStream os;
+ protected SocketWrapper socketWrapper;
+
+ public OutputStreamSink(String id) {
+ super(id);
+ }
+
+ public OutputStreamSink(String id, OutputStream os) {
+ super(id);
+ this.os = os;
+ }
+
+ public OutputStreamSink(String id, SocketWrapper socketWrapper) {
+ super(id);
+ this.socketWrapper = socketWrapper;
+ }
+
+ public void setOutputStream(OutputStream os) {
+ this.os = os;
+ // Resume links
+ resumeLinks();
+ }
+
+ /**
+ * Send incoming data to stream.
+ */
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ try {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Writing data to stream: " + buf + ".");
+
+ os.write(buf.data, buf.offset, buf.length);
+ os.flush();
+ } catch (IOException e) {
+ System.err.println("[" + this + "] ERROR: " + e.getMessage());
+ closeStream();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ socketWrapper.upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ switch (direction) {
+ case IN:
+ super.setLink(padName, link, direction);
+
+ if (os == null)
+ // Pause links until data stream will be ready
+ link.pause();
+ break;
+ case OUT:
+ throw new RuntimeException("Cannot assign link to output pad in sink element. Element: " + this + ", pad: " + padName + ", link: " + link + ".");
+ }
+ }
+
+ private void resumeLinks() {
+ for (DataSource source : inputPads.values())
+ ((Link) source).resume();
+ }
+
+ @Override
+ protected void onClose() {
+ closeStream();
+ }
+
+ private void closeStream() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Closing stream.");
+
+ try {
+ os.close();
+ } catch (IOException e) {
+ }
+ try {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "OutputStreamSink(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ Element source = new FakeSource("source") {
+ {
+ this.verbose = true;
+ this.numBuffers = 3;
+ this.incommingBufLength = 5;
+ this.delay = 100;
+ }
+ };
+
+ OutputStreamSink sink = new OutputStreamSink("sink") {
+ {
+ verbose = true;
+ }
+ };
+
+ Link link = new SyncLink();
+
+ source.setLink(STDOUT, link, Direction.OUT);
+ sink.setLink(STDIN, link, Direction.IN);
+
+ sink.setOutputStream(new ByteArrayOutputStream());
+
+ link.sendEvent(Event.STREAM_START, Direction.IN);
+ link.run();
+
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
new file mode 100644
index 00000000000..c369350cf38
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Pipeline.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Pipeline groups multiple elements.
+ */
+public interface Pipeline extends Element {
+
+ static final String IN = Direction.IN.toString();
+ static final String OUT = Direction.OUT.toString();
+
+ /**
+ * Add elements to pipeline.
+ *
+ * @param elements
+ */
+ void add(Element... elements);
+
+ /**
+ * Add elements to pipeline and link them in given order.
+ *
+ * @param elements
+ */
+ void addAndLink(Element... elements);
+
+ /**
+ * Link elements in given order using SyncLink. Element name can have prefix
+ * "PADNAME< " or/and suffix " >PADNAME" to use given named pads instead of
+ * "stdin" and "stdout". I.e. link("foo", "bar", "baz"); is equal
+ * to link("foo >stdin", "stdout< bar >stdin", "stdout< baz"); .
+ *
+ * Special elements "IN" and "OUT" are pointing to pipeline outer interfaces,
+ * so when pipeline will be connected with other elements, outside of this
+ * pipeline, they will be connected to IN and OUT elements.
+ *
+ * Example:
+ *
+ *
+ * pipeline.link("IN", "foo", "bar", "OUT");
+ * // Make additional branch from foo to baz, and then to OUT
+ * pipeline.link("foo >baz_out", "baz", "baz_in< OUT");
+ *
+ *
+ * @param elements
+ * elements to link
+ */
+ void link(String... elements);
+
+ /**
+ * Get element by name.
+ *
+ * @return an element
+ */
+ Element get(String elementName);
+
+ /**
+ * Get link by element name and pad name.
+ */
+ Link getLink(String elementName, String padName);
+
+ /**
+ * Set link by element name and pad name. Allows to link external elements
+ * into internal elements of pipeline. Special elements "IN" and "OUT" are
+ * pointing to pipeline outer interfaces.
+ */
+ void setLink(String elementName, String padName, Link link, Direction direction);
+
+ /**
+ * Get link connected to given pad in given element and run it main loop.
+ * @param separateThread
+ * set to true to start main loop in separate thread.
+ * @param waitForStartEvent TODO
+ */
+ void runMainLoop(String element, String padName, boolean separateThread, boolean waitForStartEvent);
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
new file mode 100644
index 00000000000..abf132f6aef
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/PipelineImpl.java
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PipelineImpl implements Pipeline {
+
+ protected String id;
+ protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
+
+ public PipelineImpl(String id) {
+ this.id = id;
+ elements = initElementMap(id);
+ }
+
+ protected Map elements;
+
+ protected HashMap initElementMap(String id) {
+ HashMap map = new HashMap();
+
+ map.put(IN, new BaseElement(id + "." + IN));
+ map.put(OUT, new BaseElement(id + "." + OUT));
+ return map;
+ }
+
+ @Override
+ public Link getLink(String padName) {
+ Link link = elements.get(IN).getLink(padName);
+ if (link == null)
+ link = elements.get(OUT).getLink(padName);
+ return link;
+ }
+
+ @Override
+ public Set getPads(Direction direction) {
+ switch (direction) {
+ case IN:
+ return elements.get(IN).getPads(direction);
+
+ case OUT:
+ return elements.get(OUT).getPads(direction);
+ }
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ for (Element element : elements.values())
+ element.validate();
+
+ // Check IN element
+ {
+ Element element = get(IN);
+ int outPadsNumber = element.getPads(Direction.OUT).size();
+ int inPadsNumber = element.getPads(Direction.IN).size();
+ if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+ throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ }
+
+ // Check OUT element
+ {
+ Element element = get(OUT);
+ int outPadsNumber = element.getPads(Direction.OUT).size();
+ int inPadsNumber = element.getPads(Direction.IN).size();
+ if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
+ throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+ + element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
+ }
+
+ }
+
+ @Override
+ public void dropLink(String padName) {
+ if (elements.get(IN).getLink(padName) != null)
+ elements.get(IN).dropLink(padName);
+
+ if (elements.get(OUT).getLink(padName) != null)
+ elements.get(OUT).dropLink(padName);
+ }
+
+ @Override
+ public void dropLink(Link link) {
+ elements.get(IN).dropLink(link);
+ elements.get(OUT).dropLink(link);
+ }
+
+ @Override
+ public void replaceLink(Link existingLink, Link newLink) {
+ elements.get(IN).replaceLink(existingLink, newLink);
+ elements.get(OUT).replaceLink(existingLink, newLink);
+ }
+
+ @Override
+ public void setLink(String padName, Link link, Direction direction) {
+ // Wire links to internal elements instead
+ elements.get(direction.toString()).setLink(padName, link, direction);
+ }
+
+ @Override
+ public void poll(boolean block) {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ get(IN).handleData(buf, link);
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (direction) {
+ case IN:
+ get(IN).handleEvent(event, direction);
+ break;
+ case OUT:
+ get(OUT).handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public void add(Element... elements) {
+ for (Element element : elements) {
+ String id = element.getId();
+
+ if (this.elements.containsKey(id))
+ throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+ + this.elements.get(id) + ".");
+
+ this.elements.put(id, element);
+ }
+ }
+
+ @Override
+ public void link(String... elementNames) {
+
+ if (elementNames.length < 2)
+ throw new RuntimeException("At least two elements are necessary to create link between them.");
+
+ // Parse array of element and pad names
+
+ Element elements[] = new Element[elementNames.length];
+ String inputPads[] = new String[elementNames.length];
+ String outputPads[] = new String[elementNames.length];
+
+ int i = 0;
+ for (String elementName : elementNames) {
+ if (elementName.contains("< ")) {
+ inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
+ elementName = elementName.substring(elementName.indexOf("< ") + 2);
+ } else {
+ inputPads[i] = STDIN;
+ }
+
+ if (elementName.contains(" >")) {
+ outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
+ elementName = elementName.substring(0, elementName.indexOf(" >"));
+ } else {
+ outputPads[i] = STDOUT;
+ }
+
+ elements[i] = get(elementName);
+
+ if (elements[i] == null)
+ throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+ + this + ".");
+
+ i++;
+ }
+
+ // Link elements
+ for (i = 0; i < elements.length - 1; i++) {
+ Element leftElement = elements[i];
+ Element rightElement = elements[i + 1];
+ String leftPad = outputPads[i];
+ String rightPad = inputPads[i + 1];
+
+ String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
+
+ Link link = new SyncLink(linkId);
+ leftElement.setLink(leftPad, link, Direction.OUT);
+ rightElement.setLink(rightPad, link, Direction.IN);
+ }
+ }
+
+ @Override
+ public void addAndLink(Element... elements) {
+ add(elements);
+ link(elements);
+ }
+
+ private void link(Element... elements) {
+ String elementNames[] = new String[elements.length];
+
+ int i = 0;
+ for (Element element : elements) {
+ elementNames[i++] = element.getId();
+ }
+
+ link(elementNames);
+ }
+
+ @Override
+ public Element get(String elementName) {
+ return elements.get(elementName);
+ }
+
+ @Override
+ public Link getLink(String elementName, String padName) {
+ return elements.get(elementName).getLink(padName);
+
+ }
+
+ @Override
+ public void setLink(String elementName, String padName, Link link, Direction direction) {
+ elements.get(elementName).setLink(padName, link, direction);
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
+ validate();
+
+ Link link = getLink(elementName, padName);
+
+ if (link == null)
+ throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
+
+ if (!waitForStartEvent)
+ link.sendEvent(Event.STREAM_START, Direction.OUT);
+
+ if (separateThread) {
+ Thread thread = new Thread(link);
+ thread.setDaemon(true);
+ thread.start();
+ } else {
+ link.run();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Pipeline(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ // System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("main");
+
+ // Create elements
+ pipeline.add(new FakeSource("source") {
+ {
+ this.incommingBufLength = 3;
+ this.numBuffers = 10;
+ this.delay = 100;
+ }
+ });
+ pipeline.add(new BaseElement("tee"));
+ pipeline.add(new FakeSink("sink") {
+ {
+ this.verbose = true;
+ }
+ });
+ pipeline.add(new FakeSink("sink2") {
+ {
+ this.verbose = true;
+ }
+ });
+
+ // Link elements
+ pipeline.link("source", "tee", "sink");
+ pipeline.link("tee >out2", "sink2");
+
+ // Run main loop
+ pipeline.runMainLoop("source", STDOUT, false, false);
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
new file mode 100644
index 00000000000..7a1734053d9
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/Queue.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Message queue for safe transfer of packets between threads.
+ */
+public class Queue extends BaseElement {
+
+ protected LinkedBlockingQueue queue = new LinkedBlockingQueue();
+
+ public Queue(String id) {
+ super(id);
+ }
+
+ @SuppressWarnings("incomplete-switch")
+ @Override
+ public void poll(boolean block) {
+ try {
+ ByteBuffer buf = null;
+ if (block) {
+ buf = queue.take();
+ } else {
+ buf = queue.poll(100, TimeUnit.MILLISECONDS);
+ }
+
+ if (buf != null)
+ pushDataToAllOuts(buf);
+
+ } catch (Exception e) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.OUT);
+ closeQueue();
+ }
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ // Put incoming data into queue
+ try {
+ queue.put(buf);
+ } catch (Exception e) {
+ sendEventToAllPads(Event.STREAM_CLOSE, Direction.IN);
+ closeQueue();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case LINK_SWITCH_TO_PULL_MODE:
+ // Do not propagate this event, because this element is boundary between
+ // threads
+ break;
+ default:
+ super.handleEvent(event, direction);
+ }
+ }
+
+ @Override
+ protected void onClose() {
+ super.onClose();
+ closeQueue();
+ }
+
+ private void closeQueue() {
+ queue.clear();
+ queue.add(null);
+ // Drop queue to indicate that upstream is closed.
+ // May produce NPE in poll().
+ queue = null;
+ }
+
+ @Override
+ public String toString() {
+ return "Queue(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+
+ Element source1 = new FakeSource("source1") {
+ {
+ this.delay = 100;
+ this.numBuffers = 10;
+ this.incommingBufLength = 10;
+ }
+ };
+
+ Element source2 = new FakeSource("source2") {
+ {
+ this.delay = 100;
+ this.numBuffers = 10;
+ this.incommingBufLength = 10;
+ }
+ };
+
+ Pipeline pipeline = new PipelineImpl("test");
+ pipeline.add(source1);
+ pipeline.add(source2);
+ pipeline.add(new Queue("queue"));
+ pipeline.add(new FakeSink("sink"));
+
+ // Main flow
+ pipeline.link("source1", "in1< queue");
+ pipeline.link("source2", "in2< queue");
+ pipeline.link("queue", "sink");
+
+ new Thread(pipeline.getLink("source1", STDOUT)).start();
+ new Thread(pipeline.getLink("source2", STDOUT)).start();
+ pipeline.getLink("sink", STDIN).run();
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
new file mode 100644
index 00000000000..2ddf0b635b9
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SocketWrapper.java
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+import static rdpclient.MockServer.Packet.PacketType.CLIENT;
+import static rdpclient.MockServer.Packet.PacketType.SERVER;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import rdpclient.MockServer;
+import rdpclient.MockServer.Packet;
+import rdpclient.TrustAllX509TrustManager;
+
+public class SocketWrapper extends PipelineImpl {
+
+ protected InputStreamSource source;
+ protected OutputStreamSink sink;
+ protected Socket socket;
+ protected InetSocketAddress address;
+
+ protected SSLSocket sslSocket;
+
+ //protected String SSL_VERSION_TO_USE = "TLSv1.2";
+ /*DEBUG*/protected String SSL_VERSION_TO_USE = "TLSv1";
+
+ public SocketWrapper(String id) {
+ super(id);
+ }
+
+ @Override
+ protected HashMap initElementMap(String id) {
+ HashMap map = new HashMap();
+
+ source = new InputStreamSource(id + "." + OUT, this);
+ sink = new OutputStreamSink(id + "." + IN, this);
+
+ // Pass requests to read data to socket input stream
+ map.put(OUT, source);
+
+ // All incoming data, which is sent to this socket wrapper, will be sent
+ // to socket remote
+ map.put(IN, sink);
+
+ return map;
+ }
+
+ /**
+ * Connect this socket wrapper to remote server and start main loop on
+ * IputStreamSource stdout link, to watch for incoming data, and
+ * OutputStreamSink stdin link, to pull for outgoing data.
+ *
+ * @param address
+ * @throws IOException
+ */
+ public void connect(InetSocketAddress address) throws IOException {
+ this.address = address;
+
+ // Connect socket to server
+ socket = SocketFactory.getDefault().createSocket();
+ try {
+ socket.connect(address);
+
+ InputStream is = socket.getInputStream();
+ source.setInputStream(is);
+
+ OutputStream os = socket.getOutputStream();
+ sink.setOutputStream(os);
+
+ // Start polling for data to send to remote sever
+ runMainLoop(IN, STDIN, true, true);
+
+ // Push incoming data from server to handlers
+ runMainLoop(OUT, STDOUT, false, false);
+
+ } finally {
+ socket.close();
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event, Direction direction) {
+ switch (event) {
+ case SOCKET_UPGRADE_TO_SSL:
+ upgradeToSsl();
+ break;
+ default:
+ super.handleEvent(event, direction);
+ break;
+ }
+ }
+
+ public void upgradeToSsl() {
+
+ if (sslSocket != null)
+ // Already upgraded
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Upgrading socket to SSL.");
+
+ try {
+ // Use most secure implementation of SSL available now.
+ // JVM will try to negotiate TLS1.2, then will fallback to TLS1.0, if
+ // TLS1.2 is not supported.
+ SSLContext sslContext = SSLContext.getInstance(SSL_VERSION_TO_USE);
+
+ // Trust all certificates (FIXME: insecure)
+ sslContext.init(null, new TrustManager[] { new TrustAllX509TrustManager() }, null);
+
+ SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ sslSocket = (SSLSocket) sslSocketFactory.createSocket(socket, address.getHostString(), address.getPort(), true);
+ sslSocket.startHandshake();
+
+ InputStream sis = sslSocket.getInputStream();
+ source.setInputStream(sis);
+
+ OutputStream sos = sslSocket.getOutputStream();
+ sink.setOutputStream(sos);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot upgrade socket to SSL: " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void validate() {
+ for (Element element : elements.values())
+ element.validate();
+
+ if (get(IN).getPads(Direction.IN).size() == 0)
+ throw new RuntimeException("[ " + this + "] Input of socket is not connected.");
+
+ if (get(OUT).getPads(Direction.OUT).size() == 0)
+ throw new RuntimeException("[ " + this + "] Output of socket is not connected.");
+
+ }
+
+ public void shutdown() {
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.IN);
+ } catch (Exception e) {
+ }
+ try {
+ handleEvent(Event.STREAM_CLOSE, Direction.OUT);
+ } catch (Exception e) {
+ }
+ try {
+ if (sslSocket != null)
+ sslSocket.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SocketWrapper(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ try {
+ System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ System.setProperty("rdpclient.MockServer.debug", "true");
+
+ Pipeline pipeline = new PipelineImpl("echo client");
+
+ SocketWrapper socketWrapper = new SocketWrapper("socket");
+
+ pipeline.add(socketWrapper);
+ pipeline.add(new BaseElement("echo"));
+
+ pipeline.link("socket", "echo", "socket");
+
+ final byte[] mockData = new byte[] { 0x01, 0x02, 0x03 };
+ MockServer server = new MockServer(new Packet[] { new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockData;
+ }
+ }, new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockData;
+ }
+ } });
+ server.start();
+ InetSocketAddress address = server.getAddress();
+
+ socketWrapper.connect(address);
+
+ } catch (IOException e) {
+ e.printStackTrace(System.err);
+ }
+
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
new file mode 100644
index 00000000000..32c14bb947f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/streamer/SyncLink.java
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package streamer;
+
+/**
+ * Link to transfer data in bounds of single thread (synchronized transfer).
+ * Must not be used to send data to elements served in different threads.
+ */
+public class SyncLink implements Link {
+
+ /**
+ * When null packet is pulled from source element, then make slight delay to
+ * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
+ * source element cannot produce data right now.
+ */
+ protected static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
+
+ /**
+ * Delay for null packets in poll method when blocking is requested, in
+ * milliseconds.
+ */
+ protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
+
+ /**
+ * Set to true to print debugging messages.
+ */
+ protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
+
+ /**
+ * ID of this link.
+ */
+ protected String id = null;
+
+ /**
+ * Buffer with data to hold because link is paused, or data is pushed back.
+ */
+ protected ByteBuffer cacheBuffer = null;
+
+ /**
+ * Size of expected packet. Data must be hold in link until full packet will
+ * be read.
+ */
+ protected int expectedPacketSize = 0;
+
+ /**
+ * Number of packets and packet header transferred to element.
+ */
+ protected int packetNumber = 0;
+
+ /**
+ * Set to true to hold all data in link until it will be set to false again.
+ */
+ protected boolean paused = false;
+
+ /**
+ * Element to pull data from, when in pull mode.
+ */
+ protected Element source = null;
+
+ /**
+ * Element to send data to in both pull and push modes.
+ */
+ protected Element sink = null;
+
+ /**
+ * When in loop, indicates that loop must be stopped.
+ *
+ * @see run()
+ */
+ private boolean shutdown = false;
+
+ /**
+ * Indicates that event STREAM_START is passed over this link, so main loop
+ * can be started to pull data from source element.
+ */
+ protected boolean start;
+
+ /**
+ * Operate in pull mode.
+ */
+ protected boolean pullMode;
+
+ public SyncLink() {
+ }
+
+ public SyncLink(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void pushBack(ByteBuffer buf) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
+
+ if (cacheBuffer != null) {
+ ByteBuffer tmp = cacheBuffer.join(buf);
+ cacheBuffer.unref();
+ cacheBuffer = tmp;
+ } else {
+ cacheBuffer = buf;
+ cacheBuffer.ref();
+ }
+
+ resetCursor();
+ }
+
+ private void resetCursor() {
+ // Reset cursor
+ cacheBuffer.cursor = 0;
+ }
+
+ @Override
+ public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
+ pushBack(buf);
+ expectedPacketSize = lengthOfFullPacket;
+ }
+
+ @Override
+ public String toString() {
+ return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
+ }
+
+ /**
+ * Push data to sink. Call with null to push cached data.
+ */
+ @Override
+ public void sendData(ByteBuffer buf) {
+ if (!paused && pullMode)
+ throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Incoming buffer: " + buf + ".");
+
+ if (buf == null && cacheBuffer == null)
+ return;
+
+ if (cacheBuffer != null && buf != null) {
+ // Join old data with fresh data
+ buf = cacheBuffer.join(buf);
+ cacheBuffer.unref();
+ cacheBuffer = buf;
+ }
+
+ // Store buffer in cache field to simplify following loop
+ if (buf != null)
+ cacheBuffer = buf;
+
+ // When data pushed back and length of data is less than length of full
+ // packet, then feed data to sink element immediately
+ while (cacheBuffer != null) {
+ if (paused) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
+
+ // Wait until rest of packet will be read
+ return;
+ }
+
+ if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
+ + expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
+
+ // Wait until rest of packet will be read
+ return;
+ }
+
+ // Full packet or packet header is read, feed it to element
+ buf = cacheBuffer;
+ cacheBuffer = null;
+ expectedPacketSize = 0;
+ packetNumber++;
+
+ if (sink == null)
+ throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
+
+ sink.handleData(buf, this);
+ // cacheBuffer and expectedPacketSize can be changed at this time
+ }
+
+ }
+
+ @SuppressWarnings("incomplete-switch")
+ @Override
+ public void sendEvent(Event event, Direction direction) {
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Event " + event + " is received.");
+
+ // Shutdown main loop (if any) when STREAM_CLOSE event is received.
+ switch (event) {
+ case STREAM_START: {
+ if (!start)
+ start = true;
+ else
+ // Event already sent trough this link
+ return;
+ break;
+ }
+ case STREAM_CLOSE: {
+ if (!shutdown)
+ shutdown = true;
+ else
+ // Event already sent trough this link
+ return;
+ break;
+ }
+ case LINK_SWITCH_TO_PULL_MODE: {
+ setPullMode();
+ break;
+ }
+
+ }
+
+ switch (direction) {
+ case IN:
+ source.handleEvent(event, direction);
+ break;
+ case OUT:
+ sink.handleEvent(event, direction);
+ break;
+ }
+ }
+
+ @Override
+ public ByteBuffer pull(boolean block) {
+ if (!pullMode)
+ throw new RuntimeException("This link is not in pull mode.");
+
+ if (paused) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Cannot pull, link is paused.");
+
+ // Make slight delay in such case, to avoid consuming 100% of CPU
+ if (block) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ return null;
+ }
+
+ // If data in cache can be sent immediately,
+ // then return it instead of asking for more data from source
+ if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
+
+ ByteBuffer tmp = cacheBuffer;
+ cacheBuffer = null;
+ return tmp;
+ }
+
+ // Pause this link, so incoming data will not be sent to sink
+ // immediately, then ask source element for more data
+ pause();
+ source.poll(block);
+ resume();
+
+ // Can return something only when data was stored in buffer
+ if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
+
+ ByteBuffer tmp = cacheBuffer;
+ cacheBuffer = null;
+ return tmp;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Element setSink(Element sink) {
+ if (sink != null && this.sink != null)
+ throw new RuntimeException("This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: " + this.sink + ".");
+
+ if (sink == null && cacheBuffer != null)
+ throw new RuntimeException("Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
+
+ this.sink = sink;
+
+ return sink;
+ }
+
+ @Override
+ public Element setSource(Element source) {
+ if (this.source != null && source != null)
+ throw new RuntimeException("This link source element is already set. Link: " + this + ", new source: " + source + ", existing source: " + this.source
+ + ".");
+
+ this.source = source;
+ return source;
+ }
+
+ @Override
+ public Element getSource() {
+ return source;
+ }
+
+ @Override
+ public Element getSink() {
+ return sink;
+ }
+
+ @Override
+ public void pause() {
+ if (paused)
+ throw new RuntimeException("Link is already paused.");
+
+ paused = true;
+
+ }
+
+ @Override
+ public void resume() {
+ paused = false;
+ }
+
+ /**
+ * Run pull loop to actively pull data from source and push it to sink. It
+ * must be only one pull loop per thread.
+ *
+ * Pull loop will start after event STREAM_START. This link and source element
+ * incomming links will be switched to pull mode before pull loop will be
+ * started using event LINK_SWITCH_TO_PULL_MODE.
+ */
+ @Override
+ public void run() {
+ // Wait until even STREAM_START will arrive
+ while (!start) {
+ delay();
+ }
+
+ sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Starting pull loop.");
+
+ // Pull source in loop
+ while (!shutdown) {
+ // Pull data from source element and send it to sink element
+ ByteBuffer data = pull(true);
+ if (data != null)
+ sink.handleData(data, this);
+
+ if (!shutdown && data == null) {
+ // Make slight delay to avoid consuming of 100% of CPU
+ delay();
+ }
+ }
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Pull loop finished.");
+
+ }
+
+ protected void delay() {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ throw new RuntimeException("Interrupted in main loop.", e);
+ }
+ }
+
+ @Override
+ public void setPullMode() {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Switching to PULL mode.");
+
+ this.pullMode = true;
+ }
+
+ @Override
+ public void drop() {
+ if (pullMode)
+ throw new RuntimeException("Cannot drop link in pull mode.");
+
+ if (cacheBuffer != null)
+ throw new RuntimeException("Cannot drop link when cache conatains data: " + cacheBuffer + ".");
+
+ source.dropLink(this);
+ sink.dropLink(this);
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/AwtKeyboardEventToVncAdapter.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/AwtKeyboardEventToVncAdapter.java
new file mode 100644
index 00000000000..5537d24d29b
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/AwtKeyboardEventToVncAdapter.java
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import java.awt.event.KeyEvent;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Link;
+
+import common.KeyOrder;
+
+public class AwtKeyboardEventToVncAdapter extends BaseElement {
+
+ protected boolean sh = false;
+ protected boolean caps = false;
+ protected boolean num = false;
+
+ public AwtKeyboardEventToVncAdapter(String id) {
+ super(id);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ KeyOrder order = (KeyOrder) buf.getOrder();
+ buf.unref();
+
+ ByteBuffer outBuf = new ByteBuffer(8);
+ outBuf.writeByte(RfbConstants.CLIENT_KEYBOARD_EVENT);
+
+ outBuf.writeByte((order.pressed) ? RfbConstants.KEY_DOWN : RfbConstants.KEY_UP);
+ outBuf.writeShort(0); // padding
+ outBuf.writeInt(map_en_us(order));
+
+ pushDataToAllOuts(outBuf);
+ }
+
+ /**
+ * Return key scan code (in lower byte) and extended flags (in second byte).
+ */
+ private int map_en_us(KeyOrder order) {
+
+ switch (order.event.getKeyCode()) {
+ // Functional keys
+ case KeyEvent.VK_ESCAPE:
+ return 0xff1b;
+ case KeyEvent.VK_F1:
+ return 0xffbe;
+ case KeyEvent.VK_F2:
+ return 0xffbf;
+ case KeyEvent.VK_F3:
+ return 0xffc0;
+ case KeyEvent.VK_F4:
+ return 0xffc1;
+ case KeyEvent.VK_F5:
+ return 0xffc2;
+ case KeyEvent.VK_F6:
+ return 0xffc3;
+ case KeyEvent.VK_F7:
+ return 0xffc4;
+ case KeyEvent.VK_F8:
+ return 0xffc5;
+ case KeyEvent.VK_F9:
+ return 0xffc6;
+ case KeyEvent.VK_F10:
+ return 0xffc7;
+ case KeyEvent.VK_F11:
+ return 0xffc8;
+ case KeyEvent.VK_F12:
+ return 0xffc9;
+
+ // Row #1
+ case KeyEvent.VK_BACK_QUOTE:
+ return (sh) ? '~' : '`';
+ case KeyEvent.VK_1:
+ return (sh) ? '!' : '1';
+ case KeyEvent.VK_2:
+ return (sh) ? '@' : '2';
+ case KeyEvent.VK_3:
+ return (sh) ? '#' : '3';
+ case KeyEvent.VK_4:
+ return (sh) ? '$' : '4';
+ case KeyEvent.VK_5:
+ return (sh) ? '%' : '5';
+ case KeyEvent.VK_6:
+ return (sh) ? '^' : '6';
+ case KeyEvent.VK_7:
+ return (sh) ? '&' : '7';
+ case KeyEvent.VK_8:
+ return (sh) ? '*' : '8';
+ case KeyEvent.VK_9:
+ return (sh) ? '(' : '9';
+ case KeyEvent.VK_0:
+ return (sh) ? ')' : '0';
+ case KeyEvent.VK_MINUS:
+ return (sh) ? '_' : '-';
+ case KeyEvent.VK_EQUALS:
+ return (sh) ? '+' : '=';
+ case KeyEvent.VK_BACK_SPACE:
+ return 0xff08;
+
+ // Row #2
+ case KeyEvent.VK_TAB:
+ return 0xff09;
+ case KeyEvent.VK_Q:
+ return (sh ^ caps) ? 'Q' : 'q';
+ case KeyEvent.VK_W:
+ return (sh ^ caps) ? 'W' : 'w';
+ case KeyEvent.VK_E:
+ return (sh ^ caps) ? 'E' : 'e';
+ case KeyEvent.VK_R:
+ return (sh ^ caps) ? 'R' : 'r';
+ case KeyEvent.VK_T:
+ return (sh ^ caps) ? 'T' : 't';
+ case KeyEvent.VK_Y:
+ return (sh ^ caps) ? 'Y' : 'y';
+ case KeyEvent.VK_U:
+ return (sh ^ caps) ? 'U' : 'u';
+ case KeyEvent.VK_I:
+ return (sh ^ caps) ? 'I' : 'i';
+ case KeyEvent.VK_O:
+ return (sh ^ caps) ? 'O' : 'o';
+ case KeyEvent.VK_P:
+ return (sh ^ caps) ? 'P' : 'p';
+ case KeyEvent.VK_OPEN_BRACKET:
+ return (sh) ? '{' : '[';
+ case KeyEvent.VK_CLOSE_BRACKET:
+ return (sh) ? '{' : ']';
+ case KeyEvent.VK_ENTER:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_STANDARD:
+ return 0xff0d;
+ case KeyEvent.KEY_LOCATION_NUMPAD:
+ return 0xff8d;
+ }
+
+ // Row #3
+ case KeyEvent.VK_CAPS_LOCK:
+ if (order.pressed)
+ caps = !caps;
+ return 0xFFE5;
+ case KeyEvent.VK_A:
+ return (sh ^ caps) ? 'A' : 'a';
+ case KeyEvent.VK_S:
+ return (sh ^ caps) ? 'S' : 's';
+ case KeyEvent.VK_D:
+ return (sh ^ caps) ? 'D' : 'd';
+ case KeyEvent.VK_F:
+ return (sh ^ caps) ? 'F' : 'f';
+ case KeyEvent.VK_G:
+ return (sh ^ caps) ? 'G' : 'g';
+ case KeyEvent.VK_H:
+ return (sh ^ caps) ? 'H' : 'h';
+ case KeyEvent.VK_J:
+ return (sh ^ caps) ? 'J' : 'j';
+ case KeyEvent.VK_K:
+ return (sh ^ caps) ? 'K' : 'k';
+ case KeyEvent.VK_L:
+ return (sh ^ caps) ? 'L' : 'l';
+ case KeyEvent.VK_SEMICOLON:
+ return (sh) ? ':' : ';';
+ case KeyEvent.VK_QUOTE:
+ return (sh) ? '"' : '\'';
+
+ // Row #4
+ case KeyEvent.VK_SHIFT:
+ sh = !sh;
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xffe1;
+ case KeyEvent.KEY_LOCATION_RIGHT:
+ return 0xffe2;
+ }
+ case KeyEvent.VK_BACK_SLASH:
+ return (sh) ? '|' : '\\';
+ case KeyEvent.VK_Z:
+ return (sh ^ caps) ? 'Z' : 'z';
+ case KeyEvent.VK_X:
+ return (sh ^ caps) ? 'X' : 'x';
+ case KeyEvent.VK_C:
+ return (sh ^ caps) ? 'C' : 'c';
+ case KeyEvent.VK_V:
+ return (sh ^ caps) ? 'V' : 'v';
+ case KeyEvent.VK_B:
+ return (sh ^ caps) ? 'B' : 'b';
+ case KeyEvent.VK_N:
+ return (sh ^ caps) ? 'N' : 'n';
+ case KeyEvent.VK_M:
+ return (sh ^ caps) ? 'M' : 'M';
+ case KeyEvent.VK_COMMA:
+ return (sh) ? '<' : ',';
+ case KeyEvent.VK_PERIOD:
+ return (sh) ? '>' : '.';
+ case KeyEvent.VK_SLASH:
+ return (sh) ? '?' : '/';
+
+ //
+ // Bottom row
+ case KeyEvent.VK_CONTROL:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xFFE3;
+ case KeyEvent.KEY_LOCATION_RIGHT:
+ return 0xFFE4;
+ }
+ case KeyEvent.VK_WINDOWS:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xFFED; // HyperL
+ case KeyEvent.KEY_LOCATION_RIGHT:
+ return 0xFFEE; // HyperR
+ }
+ case KeyEvent.VK_META:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xFFE7; // MetaL
+ case KeyEvent.KEY_LOCATION_RIGHT:
+ return 0xFFE8; // MetaR
+ }
+
+ case KeyEvent.VK_ALT:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xFFE9;
+ case KeyEvent.KEY_LOCATION_RIGHT:
+ return 0xFFEA;
+ }
+ case KeyEvent.VK_ALT_GRAPH:
+ return 0xfe03;
+
+ case KeyEvent.VK_SPACE:
+ return ' ';
+
+ case KeyEvent.VK_CONTEXT_MENU:
+ return 0xff67;
+
+ //
+ // Special keys
+ case KeyEvent.VK_PRINTSCREEN:
+ return (sh) ? 0xFF15/* SysRq */: 0xFF61 /* Print */;
+ case KeyEvent.VK_SCROLL_LOCK:
+ return 0xFF14;
+ case KeyEvent.VK_PAUSE:
+ return (sh) ? 0xFF6B/* Break */: 0xFF13/* Pause */;
+
+ // Text navigation keys
+ case KeyEvent.VK_INSERT:
+ return 0xff63;
+ case KeyEvent.VK_DELETE:
+ return 0xffff;
+ case KeyEvent.VK_HOME:
+ return 0xff50;
+ case KeyEvent.VK_END:
+ return 0xff57;
+ case KeyEvent.VK_PAGE_UP:
+ return 0xff55;
+ case KeyEvent.VK_PAGE_DOWN:
+ return 0xff56;
+
+ // Cursor keys
+ case KeyEvent.VK_LEFT:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xff51;
+ case KeyEvent.KEY_LOCATION_NUMPAD:
+ return 0xFF96;
+ }
+ case KeyEvent.VK_UP:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xff52;
+ case KeyEvent.KEY_LOCATION_NUMPAD:
+ return 0xFF97;
+ }
+ case KeyEvent.VK_RIGHT:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xff53;
+ case KeyEvent.KEY_LOCATION_NUMPAD:
+ return 0xFF98;
+ }
+ case KeyEvent.VK_DOWN:
+ switch (order.event.getKeyLocation()) {
+ default:
+ case KeyEvent.KEY_LOCATION_LEFT:
+ return 0xff54;
+ case KeyEvent.KEY_LOCATION_NUMPAD:
+ return 0xFF99;
+ }
+
+ // Keypad
+ case KeyEvent.VK_NUM_LOCK:
+ if (order.pressed)
+ num = !num;
+ return 0xFF6F;
+ case KeyEvent.VK_DIVIDE:
+ return 0xFFAF;
+ case KeyEvent.VK_MULTIPLY:
+ return 0xFFAA;
+ case KeyEvent.VK_SUBTRACT:
+ return 0xFFAD;
+ case KeyEvent.VK_ADD:
+ return 0xFFAB;
+
+ case KeyEvent.VK_KP_LEFT:
+ return 0xFF96;
+ case KeyEvent.VK_KP_UP:
+ return 0xFF97;
+ case KeyEvent.VK_KP_RIGHT:
+ return 0xFF98;
+ case KeyEvent.VK_KP_DOWN:
+ return 0xFF99;
+
+ case KeyEvent.VK_NUMPAD0:
+ return 0xFFB0;
+ case KeyEvent.VK_NUMPAD1:
+ return 0xFFB1;
+ case KeyEvent.VK_NUMPAD2:
+ return 0xFFB2;
+ case KeyEvent.VK_NUMPAD3:
+ return 0xFFB3;
+ case KeyEvent.VK_NUMPAD4:
+ return 0xFFB4;
+ case KeyEvent.VK_NUMPAD5:
+ return 0xFFB5;
+ case KeyEvent.VK_NUMPAD6:
+ return 0xFFB6;
+ case KeyEvent.VK_NUMPAD7:
+ return 0xFFB7;
+ case KeyEvent.VK_NUMPAD8:
+ return 0xFFB8;
+ case KeyEvent.VK_NUMPAD9:
+ return 0xFFB9;
+ case KeyEvent.VK_DECIMAL:
+ return 0xFFAE;
+
+ default:
+ System.err.println("Key is not mapped: " + order + ".");
+ return ' '; // Space
+ }
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/AwtMouseEventToVncAdapter.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/AwtMouseEventToVncAdapter.java
new file mode 100644
index 00000000000..dd933947503
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/AwtMouseEventToVncAdapter.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import java.awt.event.MouseEvent;
+
+import common.MouseOrder;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Link;
+
+public class AwtMouseEventToVncAdapter extends BaseElement {
+
+ public AwtMouseEventToVncAdapter(String id) {
+ super(id);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ // Get mouse event
+ MouseOrder order = (MouseOrder)buf.getOrder();
+
+ ByteBuffer outBuf = new ByteBuffer(6);
+
+ outBuf.writeByte(RfbConstants.CLIENT_POINTER_EVENT);
+
+ int buttonMask = mapAwtModifiersToVncButtonMask(order.event.getModifiersEx());
+ outBuf.writeByte(buttonMask);
+ outBuf.writeShort(order.event.getX());
+ outBuf.writeShort(order.event.getY());
+
+ pushDataToAllOuts(outBuf);
+ }
+
+ /**
+ * Current state of buttons 1 to 8 are represented by bits 0 to 7 of
+ * button-mask respectively, 0 meaning up, 1 meaning down (pressed). On a
+ * conventional mouse, buttons 1, 2 and 3 correspond to the left, middle and
+ * right buttons on the mouse. On a wheel mouse, each step of the wheel
+ * upwards is represented by a press and release of button 4, and each step
+ * downwards is represented by a press and release of button 5.
+ *
+ * @param modifiers
+ * extended modifiers from AWT mouse event
+ * @return VNC mouse button mask
+ */
+ public static int mapAwtModifiersToVncButtonMask(int modifiers) {
+ int mask = (((modifiers & MouseEvent.BUTTON1_DOWN_MASK) != 0) ? 0x1 : 0) | (((modifiers & MouseEvent.BUTTON2_DOWN_MASK) != 0) ? 0x2 : 0)
+ | (((modifiers & MouseEvent.BUTTON3_DOWN_MASK) != 0) ? 0x4 : 0);
+ return mask;
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/EncodingsMessage.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/EncodingsMessage.java
new file mode 100644
index 00000000000..9ee3566efae
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/EncodingsMessage.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Link;
+
+public class EncodingsMessage extends BaseElement {
+
+ protected final int[] encodings;
+
+ public EncodingsMessage(String id, int[] encodings) {
+ super(id);
+ this.encodings = encodings;
+ declarePads();
+ }
+
+ protected void declarePads() {
+ inputPads.put(STDIN, null);
+ outputPads.put(STDOUT, null);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+ buf.unref();
+
+ ByteBuffer outBuf = new ByteBuffer(4 + encodings.length * 4);
+
+ outBuf.writeByte(RfbConstants.CLIENT_SET_ENCODINGS);
+
+ outBuf.writeByte(0);// padding
+
+ outBuf.writeShort(encodings.length);
+
+ for (int i = 0; i < encodings.length; i++) {
+ outBuf.writeInt(encodings[i]);
+ }
+
+ pushDataToAllOuts(outBuf);
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/FrameBufferUpdateRequest.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/FrameBufferUpdateRequest.java
new file mode 100644
index 00000000000..c8fab216a62
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/FrameBufferUpdateRequest.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import common.ScreenDescription;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Element;
+import streamer.Link;
+import streamer.MockSink;
+import streamer.MockSource;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+
+public class FrameBufferUpdateRequest extends BaseElement {
+ // TODO: use object with fields instead of raw values in map
+ public static final String INCREMENTAL_UPDATE = "incremental";
+ public static final String TARGET_X = "x";
+ public static final String TARGET_Y = "y";
+ public static final String WIDTH = "width";
+ public static final String HEIGHT = "height";
+
+ protected ScreenDescription screen;
+
+ public FrameBufferUpdateRequest(String id, ScreenDescription screen) {
+ super(id);
+ this.screen = screen;
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ Boolean incremental = (Boolean) buf.getMetadata(INCREMENTAL_UPDATE);
+ Integer x = (Integer) buf.getMetadata(TARGET_X);
+ Integer y = (Integer) buf.getMetadata(TARGET_Y);
+ Integer width = (Integer) buf.getMetadata(WIDTH);
+ Integer height = (Integer) buf.getMetadata(HEIGHT);
+ buf.unref();
+
+ // Set default values when parameters are not set
+ if (incremental == null)
+ incremental = false;
+
+ if (x == null)
+ x = 0;
+ if (y == null)
+ y = 0;
+
+ if (width == null)
+ width = screen.getFramebufferWidth();
+ if (height == null)
+ height = screen.getFramebufferHeight();
+
+ ByteBuffer outBuf = new ByteBuffer(10);
+
+ outBuf.writeByte(RfbConstants.CLIENT_FRAMEBUFFER_UPDATE_REQUEST);
+ outBuf.writeByte((incremental) ? RfbConstants.FRAMEBUFFER_INCREMENTAL_UPDATE_REQUEST : RfbConstants.FRAMEBUFFER_FULL_UPDATE_REQUEST);
+ outBuf.writeShort(x);
+ outBuf.writeShort(y);
+ outBuf.writeShort(width);
+ outBuf.writeShort(height);
+
+ if (verbose) {
+ outBuf.putMetadata("sender", this);
+ outBuf.putMetadata("dimensions", width + "x" + height + "@" + x + "x" + y);
+ }
+
+ pushDataToAllOuts(outBuf);
+ }
+
+ public static void main(String args[]) {
+ System.setProperty("streamer.Element.debug", "true");
+
+ ScreenDescription screen = new ScreenDescription();
+ screen.setFramebufferSize(120, 80);
+ Element adapter = new FrameBufferUpdateRequest("renderer", screen);
+
+ Element sink = new MockSink("sink", ByteBuffer.convertByteArraysToByteBuffers(new byte[] {
+ // Request
+ RfbConstants.CLIENT_FRAMEBUFFER_UPDATE_REQUEST,
+ // Full update (redraw area)
+ RfbConstants.FRAMEBUFFER_FULL_UPDATE_REQUEST,
+ // X
+ 0, 1,
+ // Y
+ 0, 2,
+ // Width
+ 0, 3,
+ // Height
+ 0, 4 }));
+
+ ByteBuffer buf = new ByteBuffer(new byte[0]);
+ buf.putMetadata(TARGET_X, 1);
+ buf.putMetadata(TARGET_Y, 2);
+ buf.putMetadata(WIDTH, 3);
+ buf.putMetadata(HEIGHT, 4);
+
+ Element source = new MockSource("source", new ByteBuffer[] { buf });
+
+ Pipeline pipeline = new PipelineImpl("test");
+
+ pipeline.addAndLink(source, adapter, sink);
+ pipeline.runMainLoop("source", STDOUT, false, false);
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/RGB888LE32PixelFormatRequest.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/RGB888LE32PixelFormatRequest.java
new file mode 100644
index 00000000000..8c691e72839
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/RGB888LE32PixelFormatRequest.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import common.ScreenDescription;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Link;
+
+public class RGB888LE32PixelFormatRequest extends BaseElement {
+ protected int bitsPerPixel = 32;
+ protected int depth = 24;
+ protected int bigEndianFlag = RfbConstants.LITTLE_ENDIAN;
+ protected int trueColourFlag = RfbConstants.TRUE_COLOR;
+ protected int redMax = 255;
+ protected int greenMax = 255;
+ protected int blueMax = 255;
+ protected int redShift = 0;
+ protected int greenShift = 8;
+ protected int blueShift = 16;
+
+ protected ScreenDescription screen;
+
+ public RGB888LE32PixelFormatRequest(String id, ScreenDescription screen) {
+ super(id);
+ this.screen = screen;
+ }
+
+ protected void declarePads() {
+ inputPads.put(STDIN, null);
+ outputPads.put(STDOUT, null);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+ buf.unref();
+
+ ByteBuffer outBuf = new ByteBuffer(20);
+
+ outBuf.writeByte(RfbConstants.CLIENT_SET_PIXEL_FORMAT);
+
+ // Padding
+ outBuf.writeByte(0);
+ outBuf.writeByte(0);
+ outBuf.writeByte(0);
+
+ // Send pixel format
+ outBuf.writeByte(bitsPerPixel);
+ outBuf.writeByte(depth);
+ outBuf.writeByte(bigEndianFlag);
+ outBuf.writeByte(trueColourFlag);
+ outBuf.writeShort(redMax);
+ outBuf.writeShort(greenMax);
+ outBuf.writeShort(blueMax);
+ outBuf.writeByte(redShift);
+ outBuf.writeByte(greenShift);
+ outBuf.writeByte(blueShift);
+
+ // Padding
+ outBuf.writeByte(0);
+ outBuf.writeByte(0);
+ outBuf.writeByte(0);
+
+ screen.setPixelFormat(bitsPerPixel, depth, bigEndianFlag != RfbConstants.LITTLE_ENDIAN, trueColourFlag == RfbConstants.TRUE_COLOR, redMax, greenMax,
+ blueMax, redShift, greenShift, blueShift);
+
+ pushDataToAllOuts(outBuf);
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/RfbConstants.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/RfbConstants.java
new file mode 100644
index 00000000000..c2d63bb7761
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/RfbConstants.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import java.nio.charset.Charset;
+
+public interface RfbConstants {
+
+ public static final String RFB_PROTOCOL_VERSION_MAJOR = "RFB 003.";
+ public static final String VNC_PROTOCOL_VERSION_MINOR = "003";
+ public static final String RFB_PROTOCOL_VERSION = RFB_PROTOCOL_VERSION_MAJOR + VNC_PROTOCOL_VERSION_MINOR;
+
+ /**
+ * Server message types.
+ */
+ final static int SERVER_FRAMEBUFFER_UPDATE = 0, SERVER_SET_COLOURMAP_ENTRIES = 1, SERVER_BELL = 2, SERVER_CUT_TEXT = 3;
+
+ /**
+ * Client message types.
+ */
+ public static final int CLIENT_SET_PIXEL_FORMAT = 0, CLIENT_FIX_COLOURMAP_ENTRIES = 1, CLIENT_SET_ENCODINGS = 2, CLIENT_FRAMEBUFFER_UPDATE_REQUEST = 3,
+ CLIENT_KEYBOARD_EVENT = 4, CLIENT_POINTER_EVENT = 5, CLIENT_CUT_TEXT = 6;
+
+ /**
+ * Server authorization type
+ */
+ public final static int CONNECTION_FAILED = 0, NO_AUTH = 1, VNC_AUTH = 2;
+
+ /**
+ * Server authorization reply.
+ */
+ public final static int VNC_AUTH_OK = 0, VNC_AUTH_FAILED = 1, VNC_AUTH_TOO_MANY = 2;
+
+ /**
+ * Encodings.
+ */
+ public final static int ENCODING_RAW = 0, ENCODING_COPY_RECT = 1, ENCODING_RRE = 2, ENCODING_CO_RRE = 4, ENCODING_HEXTILE = 5, ENCODING_ZRLE = 16;
+
+ /**
+ * Pseudo-encodings.
+ */
+ public final static int ENCODING_CURSOR = -239 /*0xFFFFFF11*/, ENCODING_DESKTOP_SIZE = -223 /*0xFFFFFF21*/;
+
+ /**
+ * Encodings, which we support.
+ */
+ public final static int[] SUPPORTED_ENCODINGS_ARRAY = { ENCODING_RAW, ENCODING_COPY_RECT, ENCODING_DESKTOP_SIZE };
+
+ /**
+ * Frame buffer update request type: update of whole screen or partial update.
+ */
+ public static final int FRAMEBUFFER_FULL_UPDATE_REQUEST = 0, FRAMEBUFFER_INCREMENTAL_UPDATE_REQUEST = 1;
+
+ public static final int KEY_UP = 0, KEY_DOWN = 1;
+
+ public static final int LITTLE_ENDIAN = 0, BIG_ENDIAN = 1;
+
+ public static final int EXCLUSIVE_ACCESS = 0, SHARED_ACCESS = 1;
+
+ public static final int PALETTE = 0, TRUE_COLOR = 1;
+
+ /**
+ * Default 8 bit charset to use when communicating with server.
+ */
+ public static final Charset US_ASCII_CHARSET = Charset.availableCharsets().get("US-ASCII");
+
+ /**
+ * Default 16 bit charset to use when communicating with server.
+ */
+ public static final Charset UCS2_CHARSET = Charset.availableCharsets().get("UTF-16LE");
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncClient.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncClient.java
new file mode 100644
index 00000000000..2b77e0a9603
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncClient.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import streamer.PipelineImpl;
+import streamer.Queue;
+
+import common.AwtBellAdapter;
+import common.AwtCanvasAdapter;
+import common.AwtClipboardAdapter;
+import common.AwtKeyEventSource;
+import common.AwtMouseEventSource;
+import common.BufferedImageCanvas;
+import common.ScreenDescription;
+
+public class VncClient extends PipelineImpl {
+
+ public VncClient(String id, String password, ScreenDescription screen, BufferedImageCanvas canvas) {
+ super(id);
+ assembleVNCPipeline(password, screen, canvas);
+ }
+
+ private void assembleVNCPipeline(String password, ScreenDescription screen, BufferedImageCanvas canvas) {
+
+ AwtMouseEventSource mouseEventSource = new AwtMouseEventSource("mouse");
+ AwtKeyEventSource keyEventSource = new AwtKeyEventSource("keyboard");
+
+ // Subscribe packet sender to various events
+ canvas.addMouseListener(mouseEventSource);
+ canvas.addMouseMotionListener(mouseEventSource);
+ canvas.addKeyListener(keyEventSource);
+
+ add(
+ // Handshake
+
+ // RFB protocol version exchanger
+ new Vnc_3_3_Hello("hello"),
+ // Authenticator
+ new Vnc_3_3_Authentication("auth", password),
+ // Initializer
+ new VncInitializer("init", true, screen),
+
+ new EncodingsMessage("encodings", RfbConstants.SUPPORTED_ENCODINGS_ARRAY),
+
+ new RGB888LE32PixelFormatRequest("pixel_format", screen),
+
+ // Main
+
+ // Packet receiver
+ new VncMessageHandler("message_handler", screen),
+
+ new AwtBellAdapter("bell"),
+
+ new AwtClipboardAdapter("clipboard"),
+
+ new AwtCanvasAdapter("pixels", canvas, screen),
+
+ new Queue("queue"),
+
+ new FrameBufferUpdateRequest("fbur", screen),
+
+ new AwtKeyboardEventToVncAdapter("keyboard_adapter"),
+
+ new AwtMouseEventToVncAdapter("mouse_adapter"),
+
+ mouseEventSource, keyEventSource
+
+ );
+
+ // Link handshake elements
+ link("IN", "hello", "auth", "init", "message_handler");
+ link("hello >otout", "hello< OUT");
+ link("auth >otout", "auth< OUT");
+ link("init >otout", "init< OUT");
+ link("init >encodings", "encodings");
+ link("init >pixel_format", "pixel_format");
+ link("encodings", "encodings< OUT");
+ link("pixel_format", "pixel_format< OUT");
+
+ // Link main elements
+ link("message_handler >bell", "bell");
+ link("message_handler >clipboard", "clipboard");
+ link("message_handler >pixels", "pixels");
+ link("message_handler >fbur", "fbur");
+
+ link("fbur", "fbur< queue");
+ link("keyboard", "keyboard_adapter", "keyboard< queue");
+ link("mouse", "mouse_adapter", "mouse< queue");
+ link("queue", "OUT");
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncInitializer.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncInitializer.java
new file mode 100644
index 00000000000..0882d13ef82
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncInitializer.java
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import streamer.ByteBuffer;
+import streamer.Element;
+import streamer.Link;
+import streamer.MockSink;
+import streamer.MockSource;
+import streamer.OneTimeSwitch;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+
+import common.ScreenDescription;
+
+public class VncInitializer extends OneTimeSwitch {
+
+ // Pad names
+ public static final String CLIENT_SUPPORTED_ENCODINGS_ADAPTER_PAD = "encodings";
+ public static final String CLIENT_PIXEL_FORMAT_ADAPTER_PAD = "pixel_format";
+
+ protected byte sharedFlag = RfbConstants.EXCLUSIVE_ACCESS;
+
+ /**
+ * Properties of remote screen .
+ */
+ protected ScreenDescription screen;
+
+ public VncInitializer(String id, boolean shared, ScreenDescription screen) {
+ super(id);
+
+ setSharedFlag(shared);
+ this.screen = screen;
+
+ declarePads();
+ }
+
+ @Override
+ protected void declarePads() {
+ super.declarePads();
+ outputPads.put(CLIENT_SUPPORTED_ENCODINGS_ADAPTER_PAD, null);
+ outputPads.put(CLIENT_PIXEL_FORMAT_ADAPTER_PAD, null);
+ }
+
+ public ScreenDescription getScreen() {
+ return screen;
+ }
+
+ public void setScreen(ScreenDescription screen) {
+ this.screen = screen;
+ }
+
+ public void setSharedFlag(boolean shared) {
+ if (shared)
+ sharedFlag = RfbConstants.SHARED_ACCESS;
+ else
+ sharedFlag = RfbConstants.EXCLUSIVE_ACCESS;
+ }
+
+ @Override
+ protected void handleOneTimeData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ // Server initialization message is at least 24 bytes long + length of
+ // desktop name
+ if (!cap(buf, 24, UNLIMITED, link, false))
+ return;
+
+ // Read server initialization message
+ // Read frame buffer size
+ int framebufferWidth = buf.readUnsignedShort();
+ int framebufferHeight = buf.readUnsignedShort();
+
+ // Read pixel format
+ int bitsPerPixel = buf.readUnsignedByte();
+ int depth = buf.readUnsignedByte();
+
+ int bigEndianFlag = buf.readUnsignedByte();
+ int trueColorFlag = buf.readUnsignedByte();
+
+ int redMax = buf.readUnsignedShort();
+ int greenMax = buf.readUnsignedShort();
+ int blueMax = buf.readUnsignedShort();
+
+ int redShift = buf.readUnsignedByte();
+ int greenShift = buf.readUnsignedByte();
+ int blueShift = buf.readUnsignedByte();
+
+ // Skip padding
+ buf.skipBytes(3);
+
+ // Read desktop name
+ int length = buf.readSignedInt();
+
+ // Consume exactly $length bytes, push back any extra bytes
+ if (!cap(buf, length, length, link, true))
+ return;
+
+ String desktopName = buf.readString(length, RfbConstants.US_ASCII_CHARSET);
+ buf.unref();
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Desktop name: \"" + desktopName + "\", bpp: " + bitsPerPixel + ", depth: " + depth + ", screen size: "
+ + framebufferWidth + "x" + framebufferHeight + ".");
+
+ // Set screen properties
+ screen.setFramebufferSize(framebufferWidth, framebufferHeight);
+ screen.setPixelFormat(bitsPerPixel, depth, bigEndianFlag!=RfbConstants.LITTLE_ENDIAN, trueColorFlag==RfbConstants.TRUE_COLOR, redMax, greenMax, blueMax, redShift, greenShift, blueShift);
+ screen.setDesktopName(desktopName);
+
+ // If sever screen has different parameters than ours, then change it
+ if (!screen.isRGB888_32_LE()) {
+ // Send client pixel format
+ sendClientPixelFormat();
+ }
+
+ // Send encodings supported by client
+ sendSupportedEncodings();
+
+ switchOff();
+
+ }
+
+ @Override
+ protected void onStart() {
+ ByteBuffer buf = new ByteBuffer(new byte[] { sharedFlag });
+ pushDataToOTOut(buf);
+ }
+
+ private void sendClientPixelFormat() {
+ pushDataToPad(CLIENT_PIXEL_FORMAT_ADAPTER_PAD, new ByteBuffer(0));
+ }
+
+ private void sendSupportedEncodings() {
+ pushDataToPad(CLIENT_SUPPORTED_ENCODINGS_ADAPTER_PAD, new ByteBuffer(0));
+ }
+
+ public String toString() {
+ return "VncInit(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ final String desktopName = "test";
+
+ Element source = new MockSource("source") {
+ {
+ bufs = ByteBuffer.convertByteArraysToByteBuffers(
+ // Send screen description
+ new byte[] {
+ // Framebuffer width (short)
+ 0, (byte) 200,
+ // Framebuffer height (short)
+ 0, 100,
+ // Bits per pixel
+ 32,
+ // Depth,
+ 24,
+ // Endianness flag
+ RfbConstants.LITTLE_ENDIAN,
+ // Truecolor flag
+ RfbConstants.TRUE_COLOR,
+ // Red max (short)
+ 0, (byte) 255,
+ // Green max (short)
+ 0, (byte) 255,
+ // Blue max (short)
+ 0, (byte) 255,
+ // Red shift
+ 16,
+ // Green shift
+ 8,
+ // Blue shift
+ 0,
+ // Padding
+ 0, 0, 0,
+ // Desktop name length (int)
+ 0, 0, 0, 4,
+ // Desktop name ("test", 4 bytes)
+ 't', 'e', 's', 't',
+
+ // Tail
+ 1, 2, 3
+
+ },
+ // Tail packet
+ new byte[] { 4, 5, 6 });
+ }
+ };
+
+ ScreenDescription screen = new ScreenDescription();
+ final VncInitializer init = new VncInitializer("init", true, screen);
+ Element initSink = new MockSink("initSink") {
+ {
+ // Expect shared flag
+ bufs = ByteBuffer.convertByteArraysToByteBuffers(new byte[] { RfbConstants.SHARED_ACCESS });
+ }
+ };
+ Element mainSink = new MockSink("mainSink") {
+ {
+ // Expect two tail packets
+ bufs = ByteBuffer.convertByteArraysToByteBuffers(new byte[] { 1, 2, 3 }, new byte[] { 4, 5, 6 });
+ }
+ };
+ ByteBuffer[] emptyBuf = ByteBuffer.convertByteArraysToByteBuffers(new byte[] {});
+ Element encodingsSink = new MockSink("encodings", emptyBuf);
+ Element pixelFormatSink = new MockSink("pixel_format", emptyBuf);
+
+ Pipeline pipeline = new PipelineImpl("test");
+ pipeline.addAndLink(source, init, mainSink);
+ pipeline.add(encodingsSink, pixelFormatSink, initSink);
+ pipeline.link("init >otout", "initSink");
+ pipeline.link("init >" + CLIENT_SUPPORTED_ENCODINGS_ADAPTER_PAD, "encodings");
+ pipeline.link("init >" + CLIENT_PIXEL_FORMAT_ADAPTER_PAD, "pixel_format");
+
+ pipeline.runMainLoop("source", STDOUT, false, false);
+
+ if (!screen.isRGB888_32_LE())
+ System.err.println("Screen description was read incorrectly: " + screen + ".");
+ if (!desktopName.equals(screen.getDesktopName()))
+ System.err.println("Screen desktop name was read incorrectly: \"" + screen.getDesktopName() + "\".");
+
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncMessageHandler.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncMessageHandler.java
new file mode 100644
index 00000000000..758000dd323
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/VncMessageHandler.java
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import streamer.BaseElement;
+import streamer.ByteBuffer;
+import streamer.Element;
+import streamer.Link;
+import streamer.MockSink;
+import streamer.MockSource;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+
+import common.BitmapOrder;
+import common.BitmapRectangle;
+import common.CopyRectOrder;
+import common.ScreenDescription;
+
+public class VncMessageHandler extends BaseElement {
+ protected ScreenDescription screen = null;
+
+ // Pad names
+ public static final String SERVER_BELL_ADAPTER_PAD = "bell";
+ public static final String SERVER_CLIPBOARD_ADAPTER_PAD = "clipboard";
+ public static final String PIXEL_ADAPTER_PAD = "pixels";
+ public static final String FRAME_BUFFER_UPDATE_REQUEST_ADAPTER_PAD = "fbur";
+
+ // Keys for metadata
+ public static final String CLIPBOARD_CONTENT = "content";
+ public static final String TARGET_X = "x";
+ public static final String TARGET_Y = "y";
+ public static final String WIDTH = "width";
+ public static final String HEIGHT = "height";
+ public static final String SOURCE_X = "srcX";
+ public static final String SOURCE_Y = "srcY";
+ public static final String PIXEL_FORMAT = "pixel_format";
+
+ private static final String NUM_OF_PROCESSED_RECTANGLES = "rects";
+ private static final String SAVED_CURSOR_POSITION = "cursor";
+
+ // Pixel format: RGB888 LE 32
+ public static final String RGB888LE32 = "RGB888LE32";
+
+ public VncMessageHandler(String id, ScreenDescription screen) {
+ super(id);
+ this.screen = screen;
+ declarePads();
+ }
+
+ private void declarePads() {
+ outputPads.put(SERVER_BELL_ADAPTER_PAD, null);
+ outputPads.put(SERVER_BELL_ADAPTER_PAD, null);
+ outputPads.put(SERVER_CLIPBOARD_ADAPTER_PAD, null);
+ outputPads.put(PIXEL_ADAPTER_PAD, null);
+ outputPads.put(FRAME_BUFFER_UPDATE_REQUEST_ADAPTER_PAD, null);
+
+ inputPads.put("stdin", null);
+ }
+
+ @Override
+ public void handleData(ByteBuffer buf, Link link) {
+ if (buf == null)
+ return;
+
+ try {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ if (!cap(buf, 1, UNLIMITED, link, false))
+ return;
+
+ // Read server message type
+ int messageType = buf.readUnsignedByte();
+
+ // Invoke packet handler by packet type.
+ switch (messageType) {
+
+ case RfbConstants.SERVER_FRAMEBUFFER_UPDATE: {
+ // Handle frame buffer update
+ if (!handleFBU(buf, link))
+ return;
+
+ // Frame buffer update is received and fully processed, send request for
+ // another frame buffer update to server.
+ sendFBUR();
+
+ break;
+ }
+
+ case RfbConstants.SERVER_BELL: {
+ if (!handleBell(buf, link))
+ return;
+ break;
+ }
+
+ case RfbConstants.SERVER_CUT_TEXT: {
+ if (!handleClipboard(buf, link))
+ return;
+ break;
+ }
+
+ default:
+ // TODO: allow to extend functionality
+ throw new RuntimeException("Unknown server packet type: " + messageType + ".");
+ }
+
+ // Cut tail, if any
+ cap(buf, 0, 0, link, true);
+ } finally {
+
+ // Return processed buffer back to pool
+ buf.unref();
+ }
+ }
+
+ private boolean handleClipboard(ByteBuffer buf, Link link) {
+ if (!cap(buf, 3 + 4, UNLIMITED, link, true))
+ return false;
+
+ // Skip padding
+ buf.skipBytes(3);
+
+ // Read text length
+ int length = buf.readSignedInt();
+
+ // We need full string to parse it
+ if (!cap(buf, length, UNLIMITED, link, true))
+ return false;
+
+ String content = buf.readString(length, RfbConstants.US_ASCII_CHARSET);
+
+ // Send content in metadata
+ ByteBuffer outBuf = new ByteBuffer(0);
+ outBuf.putMetadata(CLIPBOARD_CONTENT, content);
+
+ pushDataToPad(SERVER_CLIPBOARD_ADAPTER_PAD, outBuf);
+
+ return true;
+ }
+
+ private boolean handleBell(ByteBuffer buf, Link link) {
+ // Send empty packet to bell adapter to produce bell
+ pushDataToPad(SERVER_BELL_ADAPTER_PAD, new ByteBuffer(0));
+
+ return true;
+ }
+
+ // FIXME: this method is too complex
+ private boolean handleFBU(ByteBuffer buf, Link link) {
+
+ // We need at least 3 bytes here, 1 - padding, 2 - number of rectangles
+ if (!cap(buf, 3, UNLIMITED, link, true))
+ return false;
+
+ buf.skipBytes(1);// Skip padding
+
+ // Read number of rectangles
+ int numberOfRectangles = buf.readUnsignedShort();
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Frame buffer update. Number of rectangles: " + numberOfRectangles + ".");
+
+ // Each rectangle must have header at least, header length is 12 bytes.
+ if (!cap(buf, 12 * numberOfRectangles, UNLIMITED, link, true))
+ return false;
+
+ // For all rectangles
+
+ // Restore saved point, to avoid flickering and performance problems when
+ // frame buffer update is split between few incoming packets.
+ int numberOfProcessedRectangles = (buf.getMetadata(NUM_OF_PROCESSED_RECTANGLES) != null) ? (Integer) buf.getMetadata(NUM_OF_PROCESSED_RECTANGLES) : 0;
+ if (buf.getMetadata(SAVED_CURSOR_POSITION) != null)
+ buf.cursor = (Integer) buf.getMetadata(SAVED_CURSOR_POSITION);
+
+ if (verbose && numberOfProcessedRectangles > 0)
+ System.out.println("[" + this + "] INFO: Restarting from saved point. Number of already processed rectangles: " + numberOfRectangles + ", cursor: "
+ + buf.cursor + ".");
+
+ // For all new rectangles
+ for (int i = numberOfProcessedRectangles; i < numberOfRectangles; i++) {
+
+ // We need coordinates of rectangle (2x4 bytes) and encoding type (4
+ // bytes)
+ if (!cap(buf, 12, UNLIMITED, link, true))
+ return false;
+
+ // Read coordinates of rectangle
+ int x = buf.readUnsignedShort();
+ int y = buf.readUnsignedShort();
+ int width = buf.readUnsignedShort();
+ int height = buf.readUnsignedShort();
+
+ // Read rectangle encoding
+ int encodingType = buf.readSignedInt();
+
+ // Process rectangle
+ switch (encodingType) {
+
+ case RfbConstants.ENCODING_RAW: {
+ if (!handleRawRectangle(buf, link, x, y, width, height))
+ return false;
+ break;
+ }
+
+ case RfbConstants.ENCODING_COPY_RECT: {
+ if (!handleCopyRect(buf, link, x, y, width, height))
+ return false;
+ break;
+ }
+
+ case RfbConstants.ENCODING_DESKTOP_SIZE: {
+ if (!handleScreenSizeChangeRect(buf, link, x, y, width, height))
+ return false;
+ break;
+ }
+
+ default:
+ // TODO: allow to extend functionality
+ throw new RuntimeException("Unsupported ecnoding: " + encodingType + ".");
+ }
+
+ // Update information about processed rectangles to avoid handling of same
+ // rectangle multiple times.
+ // TODO: push back partial rectangle only instead
+ buf.putMetadata(NUM_OF_PROCESSED_RECTANGLES, ++numberOfProcessedRectangles);
+ buf.putMetadata(SAVED_CURSOR_POSITION, buf.cursor);
+ }
+
+ return true;
+ }
+
+ private boolean handleScreenSizeChangeRect(ByteBuffer buf, Link link, int x, int y, int width, int height) {
+ // Remote screen size is changed
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Screen size rect. Width: " + width + ", height: " + height + ".");
+
+ screen.setFramebufferSize(width, height);
+
+ return true;
+ }
+
+ private boolean handleCopyRect(ByteBuffer buf, Link link, int x, int y, int width, int height) {
+ // Copy rectangle from one part of screen to another.
+ // Areas may overlap. Antialiasing may cause visible artifacts.
+
+ // We need 4 bytes with coordinates of source rectangle
+ if (!cap(buf, 4, UNLIMITED, link, true))
+ return false;
+
+ CopyRectOrder order = new CopyRectOrder();
+
+ order.srcX = buf.readUnsignedShort();
+ order.srcY = buf.readUnsignedShort();
+ order.x = x;
+ order.y = y;
+ order.width = width;
+ order.height = height;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Copy rect. X: " + x + ", y: " + y + ", width: " + width + ", height: " + height + ", srcX: " + order.srcX
+ + ", srcY: " + order.srcY + ".");
+
+ pushDataToPad(PIXEL_ADAPTER_PAD, new ByteBuffer(order));
+
+ return true;
+ }
+
+ private boolean handleRawRectangle(ByteBuffer buf, Link link, int x, int y, int width, int height) {
+ // Raw rectangle is just array of pixels to draw on screen.
+ int rectDataLength = width * height * screen.getBytesPerPixel();
+
+ // We need at least rectDataLength bytes. Extra bytes may contain other
+ // rectangles.
+ if (!cap(buf, rectDataLength, UNLIMITED, link, true))
+ return false;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Raw rect. X: " + x + ", y: " + y + ", width: " + width + ", height: " + height + ", data length: "
+ + rectDataLength + ".");
+
+ BitmapRectangle rectangle = new BitmapRectangle();
+ rectangle.x = x;
+ rectangle.y = y;
+ rectangle.width = width;
+ rectangle.height = height;
+ rectangle.bufferWidth = width;
+ rectangle.bufferHeight = height;
+ rectangle.bitmapDataStream = buf.readBytes(rectDataLength);
+ rectangle.colorDepth=screen.getColorDeph();
+
+ BitmapOrder order = new BitmapOrder();
+ order.rectangles = new BitmapRectangle[] { rectangle };
+
+ pushDataToPad(PIXEL_ADAPTER_PAD, new ByteBuffer(order));
+ return true;
+ }
+
+ public void onStart() {
+ // Send Frame Buffer Update request
+ sendFBUR();
+ }
+
+ private void sendFBUR() {
+ ByteBuffer buf = new ByteBuffer(0);
+ buf.putMetadata("incremental", true);
+ pushDataToPad(FRAME_BUFFER_UPDATE_REQUEST_ADAPTER_PAD, buf);
+ }
+
+ public String toString() {
+ return "VNCMessageHandler(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String[] args) {
+
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ Element source = new MockSource("source") {
+ {
+ // Split messages at random boundaries to check "pushback" logic
+ bufs = ByteBuffer.convertByteArraysToByteBuffers(new byte[] {
+ // Message type: server bell
+ RfbConstants.SERVER_BELL,
+
+ // Message type: clipboard text
+ RfbConstants.SERVER_CUT_TEXT,
+ // Padding
+ 0, 0, 0,
+ // Length (test)
+ 0, 0, 0, 4,
+
+ }, new byte[] {
+ // Clipboard text
+ 't', 'e', 's', 't',
+
+ // Message type: frame buffer update
+ RfbConstants.SERVER_FRAMEBUFFER_UPDATE,
+ // Padding
+ 0,
+ // Number of rectangles
+ 0, 3, },
+
+ new byte[] {
+
+ // x, y, width, height: 0x0@4x4
+ 0, 0, 0, 0, 0, 4, 0, 4,
+ // Encoding: desktop size
+ (byte) ((RfbConstants.ENCODING_DESKTOP_SIZE >> 24) & 0xff), (byte) ((RfbConstants.ENCODING_DESKTOP_SIZE >> 16) & 0xff),
+ (byte) ((RfbConstants.ENCODING_DESKTOP_SIZE >> 8) & 0xff), (byte) ((RfbConstants.ENCODING_DESKTOP_SIZE >> 0) & 0xff), },
+
+ new byte[] {
+
+ // x, y, width, height: 0x0@4x4
+ 0, 0, 0, 0, 0, 4, 0, 4,
+ // Encoding: raw rect
+ (byte) ((RfbConstants.ENCODING_RAW >> 24) & 0xff), (byte) ((RfbConstants.ENCODING_RAW >> 16) & 0xff),
+ (byte) ((RfbConstants.ENCODING_RAW >> 8) & 0xff), (byte) ((RfbConstants.ENCODING_RAW >> 0) & 0xff),
+ // Raw pixel data 4x4x1 bpp
+ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, }, new byte[] { 11, 12, 13, 14, 15, 16,
+
+ // x, y, width, height: 0x0@2x2
+ 0, 0, 0, 0, 0, 2, 0, 2,
+ // Encoding: copy rect
+ (byte) ((RfbConstants.ENCODING_COPY_RECT >> 24) & 0xff), (byte) ((RfbConstants.ENCODING_COPY_RECT >> 16) & 0xff),
+ (byte) ((RfbConstants.ENCODING_COPY_RECT >> 8) & 0xff), (byte) ((RfbConstants.ENCODING_COPY_RECT >> 0) & 0xff),
+ // srcX, srcY: 2x2
+ 0, 2, 0, 2, });
+ }
+ };
+
+ ScreenDescription screen = new ScreenDescription() {
+ {
+ this.bytesPerPixel = 1;
+ }
+ };
+
+ final Element handler = new VncMessageHandler("handler", screen);
+
+ ByteBuffer[] emptyBuf = ByteBuffer.convertByteArraysToByteBuffers(new byte[] {});
+ Element fburSink = new MockSink("fbur", ByteBuffer.convertByteArraysToByteBuffers(new byte[] {}, new byte[] {}));
+ Element bellSink = new MockSink("bell", emptyBuf);
+ Element clipboardSink = new MockSink("clipboard", emptyBuf);
+ Element desktopSizeChangeSink = new MockSink("desktop_size", emptyBuf);
+ Element pixelsSink = new MockSink("pixels",
+ ByteBuffer.convertByteArraysToByteBuffers(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, }));
+ Element copyRectSink = new MockSink("copy_rect", emptyBuf);
+
+ Pipeline pipeline = new PipelineImpl("test");
+ pipeline.addAndLink(source, handler);
+ pipeline.add(fburSink, bellSink, clipboardSink, desktopSizeChangeSink, pixelsSink, copyRectSink);
+
+ pipeline.link("handler >" + FRAME_BUFFER_UPDATE_REQUEST_ADAPTER_PAD, "fbur");
+ pipeline.link("handler >" + SERVER_BELL_ADAPTER_PAD, "bell");
+ pipeline.link("handler >" + SERVER_CLIPBOARD_ADAPTER_PAD, "clipboard");
+ pipeline.link("handler >" + PIXEL_ADAPTER_PAD, "pixels");
+
+ pipeline.runMainLoop("source", STDOUT, false, false);
+
+ }
+
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/Vnc_3_3_Authentication.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/Vnc_3_3_Authentication.java
new file mode 100644
index 00000000000..52d9976eece
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/Vnc_3_3_Authentication.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import java.security.spec.KeySpec;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.DESKeySpec;
+
+import streamer.ByteBuffer;
+import streamer.Element;
+import streamer.FakeSink;
+import streamer.Link;
+import streamer.MockSink;
+import streamer.MockSource;
+import streamer.OneTimeSwitch;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+
+public class Vnc_3_3_Authentication extends OneTimeSwitch {
+
+ /**
+ * Password to use when authentication is required.
+ */
+ protected String password = null;
+
+ /**
+ * Authentication stage:
+ *
+ * - 0 - challenge received, response must be sent
+ *
- 1 - authentication result received.
+ *
+ */
+ protected int stage = 0;
+
+ public Vnc_3_3_Authentication(String id) {
+ super(id);
+ }
+
+ public Vnc_3_3_Authentication(String id, String password) {
+ super(id);
+ this.password = password;
+ }
+
+ @Override
+ protected void handleOneTimeData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ switch (stage) {
+ case 0: // Read security with optional challenge and response
+ stage0(buf, link);
+
+ break;
+ case 1: // Read authentication response
+ stage1(buf, link);
+ break;
+ }
+
+ }
+
+ /**
+ * Read security type. If connection type is @see
+ * RfbConstants.CONNECTION_FAILED, then throw exception. If connection type is @see
+ * RfbConstants.NO_AUTH, then switch off this element. If connection type is @see
+ * RfbConstants.VNC_AUTH, then read challenge, send encoded password, and read
+ * authentication response.
+ */
+ private void stage0(ByteBuffer buf, Link link) {
+ // At least 4 bytes are necessary
+ if (!cap(buf, 4, UNLIMITED, link, true))
+ return;
+
+ // Read security type
+ int authType = buf.readSignedInt();
+
+ switch (authType) {
+ case RfbConstants.CONNECTION_FAILED: {
+ // Server forbids to connect. Read reason and throw exception
+
+ int length = buf.readSignedInt();
+ String reason = new String(buf.data, buf.offset, length, RfbConstants.US_ASCII_CHARSET);
+
+ throw new RuntimeException("Authentication to VNC server is failed. Reason: " + reason);
+ }
+
+ case RfbConstants.NO_AUTH: {
+ // Client can connect without authorization. Nothing to do.
+ // Switch off this element from circuit
+ switchOff();
+ break;
+ }
+
+ case RfbConstants.VNC_AUTH: {
+ // Read challenge and generate response
+ responseToChallenge(buf, link);
+ break;
+ }
+
+ default:
+ throw new RuntimeException("Unsupported VNC protocol authorization scheme, scheme code: " + authType + ".");
+ }
+
+ }
+
+ private void responseToChallenge(ByteBuffer buf, Link link) {
+ // Challenge is exactly 16 bytes long
+ if (!cap(buf, 16, 16, link, true))
+ return;
+
+ ByteBuffer challenge = buf.slice(buf.cursor, 16, true);
+ buf.unref();
+
+ // Encode challenge with password
+ ByteBuffer response;
+ try {
+ response = encodePassword(challenge, password);
+ challenge.unref();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot encrypt client password to send to server: " + e.getMessage());
+ }
+
+ if (verbose) {
+ response.putMetadata("sender", this);
+ }
+
+ // Send encoded challenge
+ nextStage();
+ pushDataToOTOut(response);
+
+ }
+
+ private void nextStage() {
+ stage++;
+
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Next stage: " + stage + ".");
+ }
+
+ /**
+ * Encode password using DES encryption with given challenge.
+ *
+ * @param challenge
+ * a random set of bytes.
+ * @param password
+ * a password
+ * @return DES hash of password and challenge
+ */
+ public ByteBuffer encodePassword(ByteBuffer challenge, String password) {
+ if (challenge.length != 16)
+ throw new RuntimeException("Challenge must be exactly 16 bytes long.");
+
+ // VNC password consist of up to eight ASCII characters.
+ byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0 }; // Padding
+ byte[] passwordAsciiBytes = password.getBytes(RfbConstants.US_ASCII_CHARSET);
+ System.arraycopy(passwordAsciiBytes, 0, key, 0, Math.min(password.length(), 8));
+
+ // Flip bytes (reverse bits) in key
+ for (int i = 0; i < key.length; i++) {
+ key[i] = flipByte(key[i]);
+ }
+
+ try {
+ KeySpec desKeySpec = new DESKeySpec(key);
+ SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("DES");
+ SecretKey secretKey = secretKeyFactory.generateSecret(desKeySpec);
+ Cipher cipher = Cipher.getInstance("DES/ECB/NoPadding");
+ cipher.init(Cipher.ENCRYPT_MODE, secretKey);
+
+ ByteBuffer buf = new ByteBuffer(cipher.doFinal(challenge.data, challenge.offset, challenge.length));
+
+ return buf;
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot encode password.", e);
+ }
+ }
+
+ /**
+ * Reverse bits in byte, so least significant bit will be most significant
+ * bit. E.g. 01001100 will become 00110010.
+ *
+ * See also: http://www.vidarholen.net/contents/junk/vnc.html ,
+ * http://bytecrafter .blogspot.com/2010/09/des-encryption-as-used-in-vnc.html
+ *
+ * @param b
+ * a byte
+ * @return byte in reverse order
+ */
+ private static byte flipByte(byte b) {
+ int b1_8 = (b & 0x1) << 7;
+ int b2_7 = (b & 0x2) << 5;
+ int b3_6 = (b & 0x4) << 3;
+ int b4_5 = (b & 0x8) << 1;
+ int b5_4 = (b & 0x10) >>> 1;
+ int b6_3 = (b & 0x20) >>> 3;
+ int b7_2 = (b & 0x40) >>> 5;
+ int b8_1 = (b & 0x80) >>> 7;
+ byte c = (byte) (b1_8 | b2_7 | b3_6 | b4_5 | b5_4 | b6_3 | b7_2 | b8_1);
+ return c;
+ }
+
+ /**
+ * Read authentication result, send nothing.
+ */
+ private void stage1(ByteBuffer buf, Link link) {
+ // Read authentication response
+ if (!cap(buf, 4, 4, link, false))
+ return;
+
+ int authResult = buf.readSignedInt();
+
+ switch (authResult) {
+ case RfbConstants.VNC_AUTH_OK: {
+ // Nothing to do
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Authentication successfull.");
+ break;
+ }
+
+ case RfbConstants.VNC_AUTH_TOO_MANY:
+ throw new RuntimeException("Connection to VNC server failed: too many wrong attempts.");
+
+ case RfbConstants.VNC_AUTH_FAILED:
+ throw new RuntimeException("Connection to VNC server failed: wrong password.");
+
+ default:
+ throw new RuntimeException("Connection to VNC server failed, reason code: " + authResult);
+ }
+
+ switchOff();
+
+ }
+
+ public String toString() {
+ return "VNC3.3 Authentication(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ final String password = "test";
+
+ Element source = new MockSource("source") {
+ {
+ bufs = ByteBuffer.convertByteArraysToByteBuffers(
+ // Request authentication and send 16 byte challenge
+ new byte[] { 0, 0, 0, RfbConstants.VNC_AUTH, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 },
+ // Respond to challenge with AUTH_OK
+ new byte[] { 0, 0, 0, RfbConstants.VNC_AUTH_OK });
+ }
+ };
+
+ Element mainSink = new FakeSink("mainSink");
+ final Vnc_3_3_Authentication auth = new Vnc_3_3_Authentication("auth", password);
+ Element initSink = new MockSink("initSink") {
+ {
+ // Expect encoded password
+ bufs = new ByteBuffer[] { auth.encodePassword(new ByteBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 }), password) };
+ }
+ };
+
+ Pipeline pipeline = new PipelineImpl("test");
+ pipeline.addAndLink(source, auth, mainSink);
+ pipeline.add(initSink);
+ pipeline.link("auth >otout", "initSink");
+
+ pipeline.runMainLoop("source", STDOUT, false, false);
+
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/Vnc_3_3_Hello.java b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/Vnc_3_3_Hello.java
new file mode 100644
index 00000000000..323380b047f
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/main/java/vncclient/Vnc_3_3_Hello.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package vncclient;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import streamer.ByteBuffer;
+import streamer.InputStreamSource;
+import streamer.Link;
+import streamer.OneTimeSwitch;
+import streamer.OutputStreamSink;
+import streamer.Pipeline;
+import streamer.PipelineImpl;
+
+/**
+ * VNC server sends hello packet with RFB protocol version, e.g.
+ * "RFB 003.007\n". We need to send response packet with supported protocol
+ * version, e.g. "RFB 003.003\n".
+ */
+public class Vnc_3_3_Hello extends OneTimeSwitch {
+
+ public Vnc_3_3_Hello(String id) {
+ super(id);
+ }
+
+ @Override
+ protected void handleOneTimeData(ByteBuffer buf, Link link) {
+ if (verbose)
+ System.out.println("[" + this + "] INFO: Data received: " + buf + ".");
+
+ // Initial packet is exactly 12 bytes long
+ if (!cap(buf, 12, 12, link, false))
+ return;
+
+ // Read protocol version
+ String rfbProtocol = new String(buf.data, buf.offset, buf.length, RfbConstants.US_ASCII_CHARSET);
+ buf.unref();
+
+ // Server should use RFB protocol 3.x
+ if (!rfbProtocol.contains(RfbConstants.RFB_PROTOCOL_VERSION_MAJOR))
+ throw new RuntimeException("Cannot handshake with VNC server. Unsupported protocol version: \"" + rfbProtocol + "\".");
+
+ // Send response: we support RFB 3.3 only
+ String ourProtocolString = RfbConstants.RFB_PROTOCOL_VERSION + "\n";
+
+ ByteBuffer outBuf = new ByteBuffer(ourProtocolString.getBytes(RfbConstants.US_ASCII_CHARSET));
+
+ if (verbose) {
+ outBuf.putMetadata("sender", this);
+ outBuf.putMetadata("version", RfbConstants.RFB_PROTOCOL_VERSION);
+ }
+
+ pushDataToOTOut(outBuf);
+
+ // Switch off this element from circuit
+ switchOff();
+
+ }
+
+ public String toString() {
+ return "Vnc3.3 Hello(" + id + ")";
+ }
+
+ /**
+ * Example.
+ */
+ public static void main(String args[]) {
+ // System.setProperty("streamer.Link.debug", "true");
+ System.setProperty("streamer.Element.debug", "true");
+ // System.setProperty("streamer.Pipeline.debug", "true");
+
+ InputStream is = new ByteArrayInputStream("RFB 003.007\ntest".getBytes(RfbConstants.US_ASCII_CHARSET));
+ ByteArrayOutputStream initOS = new ByteArrayOutputStream();
+ ByteArrayOutputStream mainOS = new ByteArrayOutputStream();
+ InputStreamSource inputStreamSource = new InputStreamSource("source", is);
+ OutputStreamSink outputStreamSink = new OutputStreamSink("mainSink", mainOS);
+
+ Vnc_3_3_Hello hello = new Vnc_3_3_Hello("hello");
+
+ Pipeline pipeline = new PipelineImpl("test");
+
+ pipeline.addAndLink(inputStreamSource, hello, outputStreamSink);
+ pipeline.add(new OutputStreamSink("initSink", initOS));
+
+ pipeline.link("hello >" + OneTimeSwitch.OTOUT, "initSink");
+
+ pipeline.runMainLoop("source", STDOUT, false, false);
+
+ String initOut = new String(initOS.toByteArray(), RfbConstants.US_ASCII_CHARSET);
+ String mainOut = new String(mainOS.toByteArray(), RfbConstants.US_ASCII_CHARSET);
+
+ if (!"RFB 003.003\n".equals(initOut))
+ System.err.println("Unexpected value for hello response: \"" + initOut + "\".");
+
+ if (!"test".equals(mainOut))
+ System.err.println("Unexpected value for main data: \"" + mainOut + "\".");
+
+ }
+}
diff --git a/services/console-proxy-rdp/rdpconsole/src/test/doc/README.txt b/services/console-proxy-rdp/rdpconsole/src/test/doc/README.txt
new file mode 100644
index 00000000000..dd4168373b1
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/test/doc/README.txt
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+To debug RDP sessions with Network Monitor or Wireshark, you need to
+configure RDP server with custom private key. For Network Monitor
+Decrypt Expert, you also will need to downgrade RDP server TLS protocol
+to version 1.0.
+
+File dev-rdp-config.bat contains instructions to configure RDP to use custom
+key, open firewall, disable NLA, downgrade TLS, and start RDP service.
+
+File rdp.pfx contains custom private key (password: test) for use with
+rdp-config.bat and Network Monitor Decrypt Expert. If you will generate
+your own key, you will need to alter rpd-file.bat to use it
+fingerprints.
+
+File rdp-key.pem contains private key in PEM format for use with
+Wireshark.
diff --git a/services/console-proxy-rdp/rdpconsole/src/test/doc/dev-rdp-config.bat b/services/console-proxy-rdp/rdpconsole/src/test/doc/dev-rdp-config.bat
new file mode 100644
index 00000000000..14a7bbd0f0a
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/test/doc/dev-rdp-config.bat
@@ -0,0 +1,126 @@
+rem Licensed to the Apache Software Foundation (ASF) under one
+rem or more contributor license agreements. See the NOTICE file
+rem distributed with this work for additional information
+rem regarding copyright ownership. The ASF licenses this file
+rem to you under the Apache License, Version 2.0 (the
+rem "License"); you may not use this file except in compliance
+rem with the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing,
+rem software distributed under the License is distributed on an
+rem "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+rem KIND, either express or implied. See the License for the
+rem specific language governing permissions and limitations
+rem under the License.
+
+rem
+rem Configure and start RDP service.
+rem Configure RPD service to use custom key instead of autogenerated for Wireshark and Network Monitor Decrypt Expert.
+rem rdp.pfx is necessary because it fingerprints are hardcoded in this script.
+rem
+
+rem Turn off firewall
+
+netsh advfirewall firewall set rule group="Remote Desktop" new enable=yes
+
+rem Enable TS connections
+rem
+rem Windows Registry Editor Version 5.00
+rem
+rem [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\Terminal Server]
+rem "AllowTSConnections"=dword:00000001
+rem "fDenyTSConnections"=dword:00000000
+
+reg add "HKLM\System\CurrentControlSet\Control\Terminal Server" /v "AllowTSConnections" /t REG_DWORD /d 1 /f
+reg add "HKLM\System\CurrentControlSet\Control\Terminal Server" /v "fDenyTSConnections" /t REG_DWORD /d 0 /f
+
+rem Disable RDP NLA
+
+reg add "HKLM\System\CurrentControlSet\Control\Terminal Server\WinStations\RDP-Tcp" /v UserAuthentication /t REG_DWORD /d 0 /f
+
+rem Enable TS service
+
+sc config TermService start=auto
+
+rem Certificate Generation
+
+rem Make self-signed certificate
+
+rem makecert -r -pe -n "CN=%COMPUTERNAME%" -eku 1.3.6.1.5.5.7.3.1 -ss my -sr LocalMachine -sky exchange -sp "Microsoft RSA SChannel Cryptographic Provider" -sy 12
+
+rem Import certificate
+
+certutil -p test -importPFX "Remote Desktop" rdp.pfx
+
+rem Configure RDP server to use certificate:
+
+rem Windows Registry Editor Version 5.00
+rem
+rem [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\Terminal Server\WinStations\RDP-Tcp]
+rem "SSLCertificateSHA1Hash"=hex:c1,70,84,70,bc,56,42,0a,bb,f4,35,35,ba,a6,09,b0,4e,98,4a,47
+reg add "HKLM\System\CurrentControlSet\Control\Terminal Server\WinStations\RDP-Tcp" /v "SSLCertificateSHA1Hash" /t REG_HEX /d "" /f
+
+rem Grant permissions on certificate for everyone
+
+rem certutil -repairstore My "bcb40fb84ac891bd41068fe686864559" D:PAI(A;;GA;;;BA)(A;;GA;;;SY)(A;;GR;;;NS)
+certutil -repairstore "Remote Desktop" "bcb40fb84ac891bd41068fe686864559" D:PAI(A;;GA;;;BA)(A;;GA;;;SY)(A;;GR;;;NS)
+
+rem confirm with
+
+rem certutil -store -v My
+certutil -store -v "Remote Desktop"
+
+rem Disable TLS 1.1 (for Network Monitor Decrypt Expert)
+rem
+rem Windows Registry Editor Version 5.00
+rem
+rem [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.1\Client]
+rem "Enabled"=dword:00000000
+rem "DisabledByDefault"=dword:00000001
+rem
+rem [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.1\Server]
+rem "Enabled"=dword:00000000
+rem "DisabledByDefault"=dword:00000001
+
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.1\Client" /v "Enabled" /t REG_DWORD /d 0 /f
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.1\Client" /v "DisabledByDefault" /t REG_DWORD /d 1 /f
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.1\Server" /v "Enabled" /t REG_DWORD /d 0 /f
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.1\Server" /v "DisabledByDefault" /t REG_DWORD /d 1 /f
+
+
+rem Disable TLS 1.2 (for Network Monitor Decrypt Expert)
+rem
+rem Windows Registry Editor Version 5.00
+rem
+rem [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Client]
+rem "Enabled"=dword:00000000
+rem "DisabledByDefault"=dword:00000001
+rem
+rem [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Server]
+rem "Enabled"=dword:00000000
+rem "DisabledByDefault"=dword:00000001
+
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Client" /v "Enabled" /t REG_DWORD /d 0 /f
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Client" /v "DisabledByDefault" /t REG_DWORD /d 1 /f
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Server" /v "Enabled" /t REG_DWORD /d 0 /f
+reg add "HKLM\System\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Server" /v "DisabledByDefault" /t REG_DWORD /d 1 /f
+
+rem Start TS service
+
+net start Termservice
+
+
+rem For Network Monitor Decrypt Expert.
+
+rem Install .Net 3.5
+
+rem dism /online /enable-feature /featurename:NetFx3ServerFeatures
+rem dism /online /enable-feature /featurename:NetFx3
+
+rem PS.
+rem Don't forget to set Windows profile as active in Network Monitor, so SSL traffic branch will appear under
+rem svnchost.exe, so you will be able to decrypt it (don't forget to save and reopen captured traffic to file first).
+rem
+
diff --git a/services/console-proxy-rdp/rdpconsole/src/test/doc/rdp-key.pem b/services/console-proxy-rdp/rdpconsole/src/test/doc/rdp-key.pem
new file mode 100644
index 00000000000..cd050cd475a
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/test/doc/rdp-key.pem
@@ -0,0 +1,23 @@
+Bag Attributes
+ Microsoft Local Key set:
+ localKeyID: 01 00 00 00
+ friendlyName: 8fcf718d-921f-4bfc-9ae4-f63e9c66b6c7
+ Microsoft CSP Name: Microsoft RSA SChannel Cryptographic Provider
+Key Attributes
+ X509v3 Key Usage: 10
+-----BEGIN PRIVATE KEY-----
+MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAKrmMXjeoXRn6UFf
+Hmw2HOnT/mEeSWQANzquJnKDBORIjD2rxL3h5FQ/DQUF4gm5JvBll8uWpDX11mVm
+LlAZ2kA8KuJ2JuYEvu/GwuDyrP4D1sOiCwIjJnvmg5DjLB9sll5ohMbjMtiSFm5L
+/YJNXop/pGJucvVzL4t0ZJ1zT2lZAgMBAAECgYAN/OJeuyyQeD+iXweaVTS5OzJ7
+PrBEgM03pQpp9zXx/P6LJUe1c2UUM8bvVGoJ+eW2HNkES/oSN2MLEKAVl7aCLWTe
+7Ejc3JIRB7ZRdNt89w9XvxuRSn87pO082ciMsLvEqqDYahy3BxgI0J/GKbo28Zme
+Z9f9QNCZ8TzbXJbDmwJBANVpBSfi09n5fUy3nGurGxE3chBnyik+Rft63fZp9eiD
+lU5Q4l22+ZUTBChJUtLHztihcb4a4RQX6B4nH5Y1RtMCQQDNAVBKe2VfnFeEoIX7
+ooRnIKIVMxW08GENuJz64otshfH6jRaLL4E/QJLIpoNRFqafyuMkP5x8oZ3uvV1+
+nsujAkAd0Xez9ACP00lLn9gOPzEf/bRFUIsxqg7TLX64AGQoocIJ2ElYuMk0qByL
+mHsnEl33bM9ctZq/WPvIwsSqEzWbAkAcb/k2S8W1LJfLUwUi8dlSAOna7Pou3kVo
+RNqpxrE2faIicl3VMuLH5mo2ITsIDY9RjTBS/+vyMe0Zh/UnMlnnAkAAppLiJ15o
+L3JlVbGRN+4kCP2HVtVRVIl3OlBoVSJZ5qe+s7HowTGurU/iYr1kmWd/C5sU0KPB
+evwz8pdL08vr
+-----END PRIVATE KEY-----
diff --git a/services/console-proxy-rdp/rdpconsole/src/test/java/rdpclient/MockServerTest.java b/services/console-proxy-rdp/rdpconsole/src/test/java/rdpclient/MockServerTest.java
new file mode 100644
index 00000000000..cba01fd6a49
--- /dev/null
+++ b/services/console-proxy-rdp/rdpconsole/src/test/java/rdpclient/MockServerTest.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package rdpclient;
+
+import static rdpclient.MockServer.Packet.PacketType.CLIENT;
+import static rdpclient.MockServer.Packet.PacketType.SERVER;
+import static rdpclient.MockServer.Packet.PacketType.UPGRADE_TO_SSL;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import junit.framework.TestCase;
+import rdpclient.MockServer.Packet;
+
+public class MockServerTest extends TestCase {
+
+ public void testIsMockServerCanRespond() throws Exception {
+
+ final byte[] mockClientData = new byte[] { 0x01, 0x02, 0x03 };
+ final byte[] mockServerData = new byte[] { 0x03, 0x02, 0x01 };
+
+ MockServer server = new MockServer(new Packet[] { new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockClientData;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockServerData;
+ }
+ } });
+
+ server.start();
+
+ // Connect to server and send and receive mock data
+
+ Socket socket = SocketFactory.getDefault().createSocket();
+ try {
+ socket.connect(server.getAddress());
+
+ InputStream is = socket.getInputStream();
+ OutputStream os = socket.getOutputStream();
+
+ // Write mock data to server
+ os.write(mockClientData);
+
+ // Read data from server
+ byte actualData[] = new byte[mockServerData.length];
+ int actualDataLength = is.read(actualData);
+
+ // Compare mock data with actual data
+ assertEquals("Unexpected length of actual data read from server.", mockServerData.length, actualDataLength);
+
+ for (int i = 0; i < actualDataLength; i++) {
+ assertEquals("Unexpected byte #" + i + " in response", mockServerData[i], actualData[i]);
+ }
+
+ server.waitUntilShutdowned(1 * 1000 /* up to 1 second */);
+
+ assertNull("Unexpected exception at mock server side.", server.getException());
+ assertTrue("Server is not shutdowned at after conversation.", server.isShutdowned());
+
+ } finally {
+ socket.close();
+ }
+ }
+
+ public void testIsMockServerCanUpgradeConnectionToSsl() throws Exception {
+
+ final byte[] mockClientData1 = new byte[] { 0x01, 0x02, 0x03 };
+ final byte[] mockServerData1 = new byte[] { 0x03, 0x02, 0x01 };
+
+ final byte[] mockClientData2 = new byte[] { 0x02, 0x04, 0x02, 0x03 };
+ final byte[] mockServerData2 = new byte[] { 0x02, 0x02, 0x01, 0x04 };
+
+ MockServer server = new MockServer(new Packet[] { new Packet("Client hello") {
+ {
+ type = CLIENT;
+ data = mockClientData1;
+ }
+ }, new Packet("Server hello") {
+ {
+ type = SERVER;
+ data = mockServerData1;
+ }
+ }, new Packet("Upgrade connection to SSL") {
+ {
+ type = UPGRADE_TO_SSL;
+ }
+ }, new Packet("Client data over SSL") {
+ {
+ type = CLIENT;
+ data = mockClientData2;
+ }
+ }, new Packet("Server data over SSL") {
+ {
+ type = SERVER;
+ data = mockServerData2;
+ }
+ } });
+
+ server.start();
+
+ // Connect to server and send and receive mock data
+
+ Socket socket = SocketFactory.getDefault().createSocket();
+ try {
+ InetSocketAddress address = server.getAddress();
+ socket.connect(address);
+
+ // Send hello data over plain connection
+ {
+ InputStream is = socket.getInputStream();
+ OutputStream os = socket.getOutputStream();
+
+ // Write mock data to server
+ os.write(mockClientData1);
+
+ // Read data from server
+ byte actualData[] = new byte[mockServerData1.length];
+ int actualDataLength = is.read(actualData);
+
+ // Compare mock data with actual data
+ assertEquals("Unexpected length of actual data read from server.", mockServerData1.length, actualDataLength);
+
+ for (int i = 0; i < actualDataLength; i++) {
+ assertEquals("Unexpected byte #" + i + " in response", mockServerData1[i], actualData[i]);
+ }
+ }
+
+ // Upgrade connection to SSL and send mock data
+ {
+ //System.setProperty("javax.net.debug", "ssl");
+
+ final SSLSocketFactory sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
+ SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(socket, address.getHostString(), address.getPort(), true);
+ sslSocket.setEnabledCipherSuites(sslSocket.getSupportedCipherSuites());
+ sslSocket.startHandshake();
+
+ InputStream is = sslSocket.getInputStream();
+ OutputStream os = sslSocket.getOutputStream();
+
+ // Write mock data to server
+ os.write(mockClientData2);
+
+ // Read data from server
+ byte actualData[] = new byte[mockServerData2.length];
+ int actualDataLength = is.read(actualData);
+
+ // Compare mock data with actual data
+ assertEquals("Unexpected length of actual data read from server.", mockServerData2.length, actualDataLength);
+
+ for (int i = 0; i < actualDataLength; i++) {
+ assertEquals("Unexpected byte #" + i + " in response", mockServerData2[i], actualData[i]);
+ }
+
+ }
+
+ server.waitUntilShutdowned(1 * 1000 /* up to 1 second */);
+
+ assertNull("Unexpected exception at mock server side.", server.getException());
+ assertTrue("Server is not shutdowned at after conversation.", server.isShutdowned());
+ } finally {
+ socket.close();
+ }
+
+ }
+}