summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-05-22 19:34:21 -0700
committerLi Haoyi <haoyi.sg@gmail.com>2018-05-22 19:50:18 -0700
commit6e60ce3d921a9b5e4ced628e2014b707ce2bbbee (patch)
tree05739c9d3c528ac9d0b4950eb25f43204ceaebde /main
parent7358b3c9ecd38cb0d8e268dc7b83156b813a7065 (diff)
downloadmill-6e60ce3d921a9b5e4ced628e2014b707ce2bbbee.tar.gz
mill-6e60ce3d921a9b5e4ced628e2014b707ce2bbbee.tar.bz2
mill-6e60ce3d921a9b5e4ced628e2014b707ce2bbbee.zip
optimize output streaming to allow batch writes
Diffstat (limited to 'main')
-rw-r--r--main/client/src/mill/main/client/MillClientMain.java36
-rw-r--r--main/src/mill/main/MillServerMain.scala36
2 files changed, 34 insertions, 38 deletions
diff --git a/main/client/src/mill/main/client/MillClientMain.java b/main/client/src/mill/main/client/MillClientMain.java
index e3d3ec6e..45bd05ef 100644
--- a/main/client/src/mill/main/client/MillClientMain.java
+++ b/main/client/src/mill/main/client/MillClientMain.java
@@ -162,36 +162,24 @@ class ClientOutputPumper implements Runnable{
public void run() {
byte[] buffer = new byte[1024];
- int state = 0;
boolean running = true;
boolean first = true;
while (running) {
try {
- int n = src.read(buffer);
- first = false;
- if (n == -1) running = false;
- else {
- int i = 0;
- while (i < n) {
- switch (state) {
- case 0:
- state = buffer[i] + 1;
- break;
- case 1:
- dest1.write(buffer[i]);
- state = 0;
- break;
- case 2:
- dest2.write(buffer[i]);
- state = 0;
- break;
- }
-
- i += 1;
+ int quantity0 = (byte)src.read();
+ int quantity = Math.abs(quantity0);
+ int offset = 0;
+ while(offset < quantity){
+ int delta = src.read(buffer, offset, quantity - offset);
+ if (delta == -1) {
+ running = false;
+ break;
+ }else{
+ offset += delta;
}
- dest1.flush();
- dest2.flush();
}
+ if (quantity0 < 0) dest1.write(buffer, 0, quantity);
+ else dest2.write(buffer, 0, quantity);
} catch (IOException e) {
// Win32NamedPipeSocket input stream somehow doesn't return -1,
// instead it throws an IOException whose message contains "ReadFile()".
diff --git a/main/src/mill/main/MillServerMain.scala b/main/src/mill/main/MillServerMain.scala
index 092d958c..24e49fbe 100644
--- a/main/src/mill/main/MillServerMain.scala
+++ b/main/src/mill/main/MillServerMain.scala
@@ -114,7 +114,7 @@ class Server[T](lockBase: String,
def handleRun(clientSocket: Socket) = {
val currentOutErr = clientSocket.getOutputStream
- val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true)
+ val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, -1), true)
val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true)
val socketIn = clientSocket.getInputStream
val argStream = new FileInputStream(lockBase + "/run")
@@ -132,7 +132,6 @@ class Server[T](lockBase: String,
@volatile var done = false
@volatile var idle = false
val t = new Thread(() =>
-
try {
val (result, newStateCache) = sm.main0(
args,
@@ -150,8 +149,6 @@ class Server[T](lockBase: String,
java.nio.file.Paths.get(lockBase + "/exitCode"),
(if (result) 0 else 1).toString.getBytes
)
- } catch{case WatchInterrupted(sc: Option[T]) =>
- sm.stateCache = sc
} finally{
done = true
idle = true
@@ -227,16 +224,27 @@ object Server{
}
}
-class ProxyOutputStream(x: => java.io.OutputStream,
+class ProxyOutputStream(out: java.io.OutputStream,
key: Int) extends java.io.OutputStream {
- override def write(b: Int) = x.synchronized{
- x.write(key)
- x.write(b)
+ override def write(b: Int) = out.synchronized{
+ out.write(key)
+ out.write(b)
}
+
+ override def write(b: Array[Byte]): Unit = out.synchronized{
+ write(b, 0, b.length)
+ }
+
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = out.synchronized{
+ var i = off
+ while(i < len){
+ val chunkLength = math.min(len - i, 127)
+ out.write(chunkLength * key)
+ out.write(b, i, chunkLength)
+ i += chunkLength
+ }
+ }
+ override def flush() = out.flush()
+ override def close() = out.close()
}
-class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
- def read() = x.read()
- override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
- override def read(b: Array[Byte]) = x.read(b)
-}
-case class WatchInterrupted[T](stateCache: Option[T]) extends Exception
+