From c04bfa1c0ee5a51ef5f63ade8e63d1f55f53fa3e Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 22 May 2018 21:46:45 -0700 Subject: Migrate `ProxyOutputStream` to the `main.client` module, add unit/fuzz tests to make sure it works --- .../src/mill/main/client/MillClientMain.java | 52 +------------------ .../src/mill/main/client/ProxyOutputStream.java | 34 ++++++++++++ .../src/mill/main/client/ProxyStreamPumper.java | 60 ++++++++++++++++++++++ 3 files changed, 95 insertions(+), 51 deletions(-) create mode 100644 main/client/src/mill/main/client/ProxyOutputStream.java create mode 100644 main/client/src/mill/main/client/ProxyStreamPumper.java (limited to 'main/client/src') diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java index 45bd05ef..3ec4f8b0 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/main/client/src/mill/main/client/MillClientMain.java @@ -129,7 +129,7 @@ public class MillClientMain { InputStream outErr = ioSocket.getInputStream(); OutputStream in = ioSocket.getOutputStream(); - ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr); + ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr); InputPumper inPump = new InputPumper(stdin, in, true); Thread outThread = new Thread(outPump); outThread.setDaemon(true); @@ -149,53 +149,3 @@ public class MillClientMain { } } } - -class ClientOutputPumper implements Runnable{ - private InputStream src; - private OutputStream dest1; - private OutputStream dest2; - public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){ - this.src = src; - this.dest1 = dest1; - this.dest2 = dest2; - } - - public void run() { - byte[] buffer = new byte[1024]; - boolean running = true; - boolean first = true; - while (running) { - try { - int quantity0 = (byte)src.read(); - int quantity = Math.abs(quantity0); - int offset = 0; - while(offset < quantity){ - int delta = src.read(buffer, offset, quantity - offset); - if (delta == -1) { - running = false; - break; - }else{ - offset += delta; - } - } - if (quantity0 < 0) dest1.write(buffer, 0, quantity); - else dest2.write(buffer, 0, quantity); - } catch (IOException e) { - // Win32NamedPipeSocket input stream somehow doesn't return -1, - // instead it throws an IOException whose message contains "ReadFile()". - // However, if it throws an IOException before ever reading some bytes, - // it could not connect to the server, so exit. - if (Util.isWindows && e.getMessage().contains("ReadFile()")) { - if (first) { - System.err.println("Failed to connect to server"); - System.exit(1); - } else running = false; - } else { - e.printStackTrace(); - System.exit(1); - } - } - } - } - -} diff --git a/main/client/src/mill/main/client/ProxyOutputStream.java b/main/client/src/mill/main/client/ProxyOutputStream.java new file mode 100644 index 00000000..339e0150 --- /dev/null +++ b/main/client/src/mill/main/client/ProxyOutputStream.java @@ -0,0 +1,34 @@ +package mill.main.client; + +import java.io.IOException; + +public class ProxyOutputStream extends java.io.OutputStream { + private java.io.OutputStream out; + private int key; + public ProxyOutputStream(java.io.OutputStream out, int key){ + this.out = out; + this.key = key; + } + @Override synchronized public void write(int b) throws IOException { + out.write(key); + out.write(b); + } + @Override synchronized public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + @Override synchronized public void write(byte[] b, int off, int len) throws IOException { + int i = 0; + while(i < len && i + off < b.length){ + int chunkLength = Math.min(len - i, 127); + out.write(chunkLength * key); + out.write(b, off + i, Math.min(b.length - off - i, chunkLength)); + i += chunkLength; + } + } + @Override public void flush() throws IOException { + out.flush(); + } + @Override public void close() throws IOException { + out.close(); + } +} diff --git a/main/client/src/mill/main/client/ProxyStreamPumper.java b/main/client/src/mill/main/client/ProxyStreamPumper.java new file mode 100644 index 00000000..977323f3 --- /dev/null +++ b/main/client/src/mill/main/client/ProxyStreamPumper.java @@ -0,0 +1,60 @@ +package mill.main.client; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class ProxyStreamPumper implements Runnable{ + private InputStream src; + private OutputStream dest1; + private OutputStream dest2; + public ProxyStreamPumper(InputStream src, OutputStream dest1, OutputStream dest2){ + this.src = src; + this.dest1 = dest1; + this.dest2 = dest2; + } + + public void run() { + byte[] buffer = new byte[1024]; + boolean running = true; + boolean first = true; + while (running) { + try { + int quantity0 = (byte)src.read(); + int quantity = Math.abs(quantity0); + int offset = 0; + int delta = -1; + while(offset < quantity){ + delta = src.read(buffer, offset, quantity - offset); + if (delta == -1) { + running = false; + break; + }else{ + offset += delta; + } + } + + if (delta != -1){ + if (quantity0 > 0) dest1.write(buffer, 0, offset); + else dest2.write(buffer, 0, offset); + } + } catch (IOException e) { + // Win32NamedPipeSocket input stream somehow doesn't return -1, + // instead it throws an IOException whose message contains "ReadFile()". + // However, if it throws an IOException before ever reading some bytes, + // it could not connect to the server, so exit. + if (Util.isWindows && e.getMessage().contains("ReadFile()")) { + if (first) { + System.err.println("Failed to connect to server"); + System.exit(1); + } else running = false; + } else { + e.printStackTrace(); + System.exit(1); + } + } + } + } + +} -- cgit v1.2.3