From 41f87cb6df7e8d64bed6c7ae29897cfcf8217fa8 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Tue, 20 Feb 2018 07:57:15 -0800 Subject: Simpler, apparently bug-free stdout/stderr multiplexing --- main/src/mill/ServerClient.scala | 77 ++++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 27 deletions(-) (limited to 'main/src') diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala index 4a80eef5..5e15314a 100644 --- a/main/src/mill/ServerClient.scala +++ b/main/src/mill/ServerClient.scala @@ -30,16 +30,17 @@ object Client{ def main(args: Array[String]): Unit = { WithLock(1) { lockBase => - val start = System.currentTimeMillis() val inFile = new java.io.File(lockBase + "/stdin") - val outFile = new java.io.File(lockBase + "/stdout") - val errFile = new java.io.File(lockBase + "/stderr") + val outErrFile = new java.io.File(lockBase + "/stdouterr") + val metaFile = new java.io.File(lockBase + "/stdmeta") val runFile = new java.io.File(lockBase + "/run") val tmpRunFile = new java.io.File(lockBase + "/run-tmp") val pidFile = new java.io.File(lockBase + "/pid") - outFile.createNewFile() - errFile.createNewFile() + outErrFile.delete() + metaFile.delete() + outErrFile.createNewFile() + metaFile.createNewFile() val f = new FileOutputStream(tmpRunFile) var i = 0 @@ -51,8 +52,9 @@ object Client{ tmpRunFile.renameTo(runFile) val in = new FileOutputStream(inFile) - val out = new FileInputStream(outFile) - val err = new FileInputStream(errFile) + val outErr = new FileInputStream(outErrFile) + val meta = new FileInputStream(metaFile) + if (!pidFile.exists()){ val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath @@ -72,16 +74,37 @@ object Client{ new java.lang.ProcessBuilder() .command(l) .redirectInput(inFile) - .redirectOutput(outFile) - .redirectError(errFile) + .redirectOutput(outErrFile) + .redirectError(outErrFile) .start() } val buffer = new Array[Byte](1024) + val metaBuffer = new Array[Byte](1024) while({ Thread.sleep(1) while({ - forward(buffer, out, System.out) | - forward(buffer, err, System.err) | + (if (outErr.available() > 0){ + val outErrN = outErr.read(buffer) + if (outErrN > 0) { + var metaN = 0 + while (metaN < outErrN) { + val delta = meta.read(metaBuffer, 0, outErrN - metaN) + if (delta > 0) { + var i = 0 + while (i < delta) { + metaBuffer(i) match { + case 0 => System.out.write(buffer(metaN + i)) + case 1 => System.err.write(buffer(metaN + i)) + } + i += 1 + } + metaN += delta + } + } + } + + true + }else false) | forward(buffer, System.in, in) })() @@ -89,8 +112,8 @@ object Client{ })() inFile.delete() - outFile.delete() - errFile.delete() + outErrFile.delete() + metaFile.delete() } } @@ -104,10 +127,13 @@ object Client{ } } -class ProxyOutputStream(x: => java.io.OutputStream) extends java.io.OutputStream { - def write(b: Int) = x.write(b) - override def write(b: Array[Byte], off: Int, len: Int) = x.write(b, off, len) - override def write(b: Array[Byte]) = x.write(b) +class ProxyOutputStream(x: => java.io.OutputStream, + meta: => java.io.OutputStream, + key: Int) extends java.io.OutputStream { + override def write(b: Int) = Server.synchronized{ + x.write(b) + meta.write(key) + } } class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ def read() = x.read() @@ -122,13 +148,13 @@ object Server{ var lastRun = System.currentTimeMillis() val pidFile = lockBase.resolve("pid") var currentIn = System.in - var currentOut: OutputStream = System.out - var currentErr: OutputStream = System.err + var currentOutErr: OutputStream = System.out + var currentMeta: OutputStream = System.err val raf = new RandomAccessFile(lockBase + "/lock", "rw") val channel = raf.getChannel - System.setOut(new PrintStream(new ProxyOutputStream(currentOut), true)) - System.setErr(new PrintStream(new ProxyOutputStream(currentErr), true)) + System.setOut(new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true)) + System.setErr(new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true)) System.setIn(new ProxyInputStream(currentIn)) Files.createFile(pidFile) var mainRunner = Option.empty[(Cli.Config, MainRunner)] @@ -136,10 +162,9 @@ object Server{ while (System.currentTimeMillis() - lastRun < 60000) { if (!Files.exists(runFile)) Thread.sleep(10) else { - val start = System.currentTimeMillis() currentIn = Files.newInputStream(lockBase.resolve("stdin")) - currentOut = Files.newOutputStream(lockBase.resolve("stdout")) - currentErr = Files.newOutputStream(lockBase.resolve("stderr")) + currentOutErr = Files.newOutputStream(lockBase.resolve("stdouterr")) + currentMeta = Files.newOutputStream(lockBase.resolve("stdmeta")) val args = new String(Files.readAllBytes(runFile)).split('\n') try { @@ -156,14 +181,12 @@ object Server{ } } ) - val end = System.currentTimeMillis() mainRunner = mr } catch{case MainRunner.WatchInterrupted(mr) => mainRunner = Some((mr.config, mr)) } finally{ - currentOut.flush() - currentErr.flush() + currentOutErr.flush() Files.delete(runFile) lastRun = System.currentTimeMillis() } -- cgit v1.2.3