From 6e60ce3d921a9b5e4ced628e2014b707ce2bbbee Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 22 May 2018 19:34:21 -0700 Subject: optimize output streaming to allow batch writes --- core/src/mill/util/Logger.scala | 66 ++++++++++++++++------ .../src/mill/main/client/MillClientMain.java | 36 ++++-------- main/src/mill/main/MillServerMain.scala | 36 +++++++----- 3 files changed, 84 insertions(+), 54 deletions(-) diff --git a/core/src/mill/util/Logger.scala b/core/src/mill/util/Logger.scala index dfa7964a..ec8de132 100644 --- a/core/src/mill/util/Logger.scala +++ b/core/src/mill/util/Logger.scala @@ -145,24 +145,58 @@ case class FileLogger(colored: Boolean, file: Path) extends Logger { } } -case class MultiLogger(colored: Boolean, streams: Logger*) extends Logger { - lazy val outputStream: PrintStream = - new PrintStream(b => streams.foreach(_.outputStream.write(b))) { - override def flush() = streams.foreach(_.outputStream.flush()) - override def close() = streams.foreach(_.outputStream.close()) - } - lazy val errorStream: PrintStream = - new PrintStream(b => streams.foreach(_.outputStream.write(b))) { - override def flush() = streams.foreach(_.outputStream.flush()) - override def close() = streams.foreach(_.outputStream.close()) - } - lazy val inStream = streams.collect{case t: PrintLogger => t}.headOption match{ + + +class MultiStream(stream1: OutputStream, stream2: OutputStream) extends PrintStream(new OutputStream { + def write(b: Int): Unit = { + stream1.write(b) + stream2.write(b) + } + override def write(b: Array[Byte]): Unit = { + stream1.write(b) + stream2.write(b) + } + override def write(b: Array[Byte], off: Int, len: Int) = { + stream1.write(b, off, len) + stream2.write(b, off, len) + } + override def flush() = { + stream1.flush() + stream2.flush() + } + override def close() = { + stream1.close() + stream2.close() + } +}) + +case class MultiLogger(colored: Boolean, logger1: Logger, logger2: Logger) extends Logger { + + + lazy val outputStream: PrintStream = new MultiStream(logger1.outputStream, logger2.outputStream) + + lazy val errorStream: PrintStream = new MultiStream(logger1.errorStream, logger2.errorStream) + + lazy val inStream = Seq(logger1, logger2).collectFirst{case t: PrintLogger => t} match{ case Some(x) => x.inStream case None => new ByteArrayInputStream(Array()) } - def info(s: String) = streams.foreach(_.info(s)) - def error(s: String) = streams.foreach(_.error(s)) - def ticker(s: String) = streams.foreach(_.ticker(s)) - override def close() = streams.foreach(_.close()) + def info(s: String) = { + logger1.info(s) + logger2.info(s) + } + def error(s: String) = { + logger1.error(s) + logger2.error(s) + } + def ticker(s: String) = { + logger1.ticker(s) + logger2.ticker(s) + } + + override def close() = { + logger1.close() + logger2.close() + } } \ No newline at end of file diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java index e3d3ec6e..45bd05ef 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/main/client/src/mill/main/client/MillClientMain.java @@ -162,36 +162,24 @@ class ClientOutputPumper implements Runnable{ public void run() { byte[] buffer = new byte[1024]; - int state = 0; boolean running = true; boolean first = true; while (running) { try { - int n = src.read(buffer); - first = false; - if (n == -1) running = false; - else { - int i = 0; - while (i < n) { - switch (state) { - case 0: - state = buffer[i] + 1; - break; - case 1: - dest1.write(buffer[i]); - state = 0; - break; - case 2: - dest2.write(buffer[i]); - state = 0; - break; - } - - i += 1; + int quantity0 = (byte)src.read(); + int quantity = Math.abs(quantity0); + int offset = 0; + while(offset < quantity){ + int delta = src.read(buffer, offset, quantity - offset); + if (delta == -1) { + running = false; + break; + }else{ + offset += delta; } - dest1.flush(); - dest2.flush(); } + if (quantity0 < 0) dest1.write(buffer, 0, quantity); + else dest2.write(buffer, 0, quantity); } catch (IOException e) { // Win32NamedPipeSocket input stream somehow doesn't return -1, // instead it throws an IOException whose message contains "ReadFile()". diff --git a/main/src/mill/main/MillServerMain.scala b/main/src/mill/main/MillServerMain.scala index 092d958c..24e49fbe 100644 --- a/main/src/mill/main/MillServerMain.scala +++ b/main/src/mill/main/MillServerMain.scala @@ -114,7 +114,7 @@ class Server[T](lockBase: String, def handleRun(clientSocket: Socket) = { val currentOutErr = clientSocket.getOutputStream - val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) + val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, -1), true) val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) val socketIn = clientSocket.getInputStream val argStream = new FileInputStream(lockBase + "/run") @@ -132,7 +132,6 @@ class Server[T](lockBase: String, @volatile var done = false @volatile var idle = false val t = new Thread(() => - try { val (result, newStateCache) = sm.main0( args, @@ -150,8 +149,6 @@ class Server[T](lockBase: String, java.nio.file.Paths.get(lockBase + "/exitCode"), (if (result) 0 else 1).toString.getBytes ) - } catch{case WatchInterrupted(sc: Option[T]) => - sm.stateCache = sc } finally{ done = true idle = true @@ -227,16 +224,27 @@ object Server{ } } -class ProxyOutputStream(x: => java.io.OutputStream, +class ProxyOutputStream(out: java.io.OutputStream, key: Int) extends java.io.OutputStream { - override def write(b: Int) = x.synchronized{ - x.write(key) - x.write(b) + override def write(b: Int) = out.synchronized{ + out.write(key) + out.write(b) } + + override def write(b: Array[Byte]): Unit = out.synchronized{ + write(b, 0, b.length) + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = out.synchronized{ + var i = off + while(i < len){ + val chunkLength = math.min(len - i, 127) + out.write(chunkLength * key) + out.write(b, i, chunkLength) + i += chunkLength + } + } + override def flush() = out.flush() + override def close() = out.close() } -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} -case class WatchInterrupted[T](stateCache: Option[T]) extends Exception + -- cgit v1.2.3