From 6e60ce3d921a9b5e4ced628e2014b707ce2bbbee Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 22 May 2018 19:34:21 -0700 Subject: optimize output streaming to allow batch writes --- .../src/mill/main/client/MillClientMain.java | 36 ++++++++-------------- main/src/mill/main/MillServerMain.scala | 36 +++++++++++++--------- 2 files changed, 34 insertions(+), 38 deletions(-) (limited to 'main') diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java index e3d3ec6e..45bd05ef 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/main/client/src/mill/main/client/MillClientMain.java @@ -162,36 +162,24 @@ class ClientOutputPumper implements Runnable{ public void run() { byte[] buffer = new byte[1024]; - int state = 0; boolean running = true; boolean first = true; while (running) { try { - int n = src.read(buffer); - first = false; - if (n == -1) running = false; - else { - int i = 0; - while (i < n) { - switch (state) { - case 0: - state = buffer[i] + 1; - break; - case 1: - dest1.write(buffer[i]); - state = 0; - break; - case 2: - dest2.write(buffer[i]); - state = 0; - break; - } - - i += 1; + int quantity0 = (byte)src.read(); + int quantity = Math.abs(quantity0); + int offset = 0; + while(offset < quantity){ + int delta = src.read(buffer, offset, quantity - offset); + if (delta == -1) { + running = false; + break; + }else{ + offset += delta; } - dest1.flush(); - dest2.flush(); } + if (quantity0 < 0) dest1.write(buffer, 0, quantity); + else dest2.write(buffer, 0, quantity); } catch (IOException e) { // Win32NamedPipeSocket input stream somehow doesn't return -1, // instead it throws an IOException whose message contains "ReadFile()". diff --git a/main/src/mill/main/MillServerMain.scala b/main/src/mill/main/MillServerMain.scala index 092d958c..24e49fbe 100644 --- a/main/src/mill/main/MillServerMain.scala +++ b/main/src/mill/main/MillServerMain.scala @@ -114,7 +114,7 @@ class Server[T](lockBase: String, def handleRun(clientSocket: Socket) = { val currentOutErr = clientSocket.getOutputStream - val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) + val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, -1), true) val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) val socketIn = clientSocket.getInputStream val argStream = new FileInputStream(lockBase + "/run") @@ -132,7 +132,6 @@ class Server[T](lockBase: String, @volatile var done = false @volatile var idle = false val t = new Thread(() => - try { val (result, newStateCache) = sm.main0( args, @@ -150,8 +149,6 @@ class Server[T](lockBase: String, java.nio.file.Paths.get(lockBase + "/exitCode"), (if (result) 0 else 1).toString.getBytes ) - } catch{case WatchInterrupted(sc: Option[T]) => - sm.stateCache = sc } finally{ done = true idle = true @@ -227,16 +224,27 @@ object Server{ } } -class ProxyOutputStream(x: => java.io.OutputStream, +class ProxyOutputStream(out: java.io.OutputStream, key: Int) extends java.io.OutputStream { - override def write(b: Int) = x.synchronized{ - x.write(key) - x.write(b) + override def write(b: Int) = out.synchronized{ + out.write(key) + out.write(b) } + + override def write(b: Array[Byte]): Unit = out.synchronized{ + write(b, 0, b.length) + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = out.synchronized{ + var i = off + while(i < len){ + val chunkLength = math.min(len - i, 127) + out.write(chunkLength * key) + out.write(b, i, chunkLength) + i += chunkLength + } + } + override def flush() = out.flush() + override def close() = out.close() } -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} -case class WatchInterrupted[T](stateCache: Option[T]) extends Exception + -- cgit v1.2.3