summaryrefslogtreecommitdiff
path: root/main/src
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-05-22 19:34:21 -0700
committerLi Haoyi <haoyi.sg@gmail.com>2018-05-22 19:50:18 -0700
commit6e60ce3d921a9b5e4ced628e2014b707ce2bbbee (patch)
tree05739c9d3c528ac9d0b4950eb25f43204ceaebde /main/src
parent7358b3c9ecd38cb0d8e268dc7b83156b813a7065 (diff)
downloadmill-6e60ce3d921a9b5e4ced628e2014b707ce2bbbee.tar.gz
mill-6e60ce3d921a9b5e4ced628e2014b707ce2bbbee.tar.bz2
mill-6e60ce3d921a9b5e4ced628e2014b707ce2bbbee.zip
optimize output streaming to allow batch writes
Diffstat (limited to 'main/src')
-rw-r--r--main/src/mill/main/MillServerMain.scala36
1 files changed, 22 insertions, 14 deletions
diff --git a/main/src/mill/main/MillServerMain.scala b/main/src/mill/main/MillServerMain.scala
index 092d958c..24e49fbe 100644
--- a/main/src/mill/main/MillServerMain.scala
+++ b/main/src/mill/main/MillServerMain.scala
@@ -114,7 +114,7 @@ class Server[T](lockBase: String,
def handleRun(clientSocket: Socket) = {
val currentOutErr = clientSocket.getOutputStream
- val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true)
+ val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, -1), true)
val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true)
val socketIn = clientSocket.getInputStream
val argStream = new FileInputStream(lockBase + "/run")
@@ -132,7 +132,6 @@ class Server[T](lockBase: String,
@volatile var done = false
@volatile var idle = false
val t = new Thread(() =>
-
try {
val (result, newStateCache) = sm.main0(
args,
@@ -150,8 +149,6 @@ class Server[T](lockBase: String,
java.nio.file.Paths.get(lockBase + "/exitCode"),
(if (result) 0 else 1).toString.getBytes
)
- } catch{case WatchInterrupted(sc: Option[T]) =>
- sm.stateCache = sc
} finally{
done = true
idle = true
@@ -227,16 +224,27 @@ object Server{
}
}
-class ProxyOutputStream(x: => java.io.OutputStream,
+class ProxyOutputStream(out: java.io.OutputStream,
key: Int) extends java.io.OutputStream {
- override def write(b: Int) = x.synchronized{
- x.write(key)
- x.write(b)
+ override def write(b: Int) = out.synchronized{
+ out.write(key)
+ out.write(b)
}
+
+ override def write(b: Array[Byte]): Unit = out.synchronized{
+ write(b, 0, b.length)
+ }
+
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = out.synchronized{
+ var i = off
+ while(i < len){
+ val chunkLength = math.min(len - i, 127)
+ out.write(chunkLength * key)
+ out.write(b, i, chunkLength)
+ i += chunkLength
+ }
+ }
+ override def flush() = out.flush()
+ override def close() = out.close()
}
-class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
- def read() = x.read()
- override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
- override def read(b: Array[Byte]) = x.read(b)
-}
-case class WatchInterrupted[T](stateCache: Option[T]) extends Exception
+