From 6e60ce3d921a9b5e4ced628e2014b707ce2bbbee Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 22 May 2018 19:34:21 -0700 Subject: optimize output streaming to allow batch writes --- main/src/mill/main/MillServerMain.scala | 36 ++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 14 deletions(-) (limited to 'main/src') diff --git a/main/src/mill/main/MillServerMain.scala b/main/src/mill/main/MillServerMain.scala index 092d958c..24e49fbe 100644 --- a/main/src/mill/main/MillServerMain.scala +++ b/main/src/mill/main/MillServerMain.scala @@ -114,7 +114,7 @@ class Server[T](lockBase: String, def handleRun(clientSocket: Socket) = { val currentOutErr = clientSocket.getOutputStream - val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) + val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, -1), true) val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) val socketIn = clientSocket.getInputStream val argStream = new FileInputStream(lockBase + "/run") @@ -132,7 +132,6 @@ class Server[T](lockBase: String, @volatile var done = false @volatile var idle = false val t = new Thread(() => - try { val (result, newStateCache) = sm.main0( args, @@ -150,8 +149,6 @@ class Server[T](lockBase: String, java.nio.file.Paths.get(lockBase + "/exitCode"), (if (result) 0 else 1).toString.getBytes ) - } catch{case WatchInterrupted(sc: Option[T]) => - sm.stateCache = sc } finally{ done = true idle = true @@ -227,16 +224,27 @@ object Server{ } } -class ProxyOutputStream(x: => java.io.OutputStream, +class ProxyOutputStream(out: java.io.OutputStream, key: Int) extends java.io.OutputStream { - override def write(b: Int) = x.synchronized{ - x.write(key) - x.write(b) + override def write(b: Int) = out.synchronized{ + out.write(key) + out.write(b) } + + override def write(b: Array[Byte]): Unit = out.synchronized{ + write(b, 0, b.length) + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = out.synchronized{ + var i = off + while(i < len){ + val chunkLength = math.min(len - i, 127) + out.write(chunkLength * key) + out.write(b, i, chunkLength) + i += chunkLength + } + } + override def flush() = out.flush() + override def close() = out.close() } -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} -case class WatchInterrupted[T](stateCache: Option[T]) extends Exception + -- cgit v1.2.3