summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-02-20 07:57:15 -0800
committerLi Haoyi <haoyi.sg@gmail.com>2018-02-20 07:57:15 -0800
commit41f87cb6df7e8d64bed6c7ae29897cfcf8217fa8 (patch)
treeb658d883bcfb31c78aece854534a253ab925a2b1
parent09b5b5639d4885e2d32b521d97b0870879cf86f6 (diff)
downloadmill-41f87cb6df7e8d64bed6c7ae29897cfcf8217fa8.tar.gz
mill-41f87cb6df7e8d64bed6c7ae29897cfcf8217fa8.tar.bz2
mill-41f87cb6df7e8d64bed6c7ae29897cfcf8217fa8.zip
Simpler, apparently bug-free stdout/stderr multiplexing
-rw-r--r--main/src/mill/ServerClient.scala77
1 files changed, 50 insertions, 27 deletions
diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala
index 4a80eef5..5e15314a 100644
--- a/main/src/mill/ServerClient.scala
+++ b/main/src/mill/ServerClient.scala
@@ -30,16 +30,17 @@ object Client{
def main(args: Array[String]): Unit = {
WithLock(1) { lockBase =>
- val start = System.currentTimeMillis()
val inFile = new java.io.File(lockBase + "/stdin")
- val outFile = new java.io.File(lockBase + "/stdout")
- val errFile = new java.io.File(lockBase + "/stderr")
+ val outErrFile = new java.io.File(lockBase + "/stdouterr")
+ val metaFile = new java.io.File(lockBase + "/stdmeta")
val runFile = new java.io.File(lockBase + "/run")
val tmpRunFile = new java.io.File(lockBase + "/run-tmp")
val pidFile = new java.io.File(lockBase + "/pid")
- outFile.createNewFile()
- errFile.createNewFile()
+ outErrFile.delete()
+ metaFile.delete()
+ outErrFile.createNewFile()
+ metaFile.createNewFile()
val f = new FileOutputStream(tmpRunFile)
var i = 0
@@ -51,8 +52,9 @@ object Client{
tmpRunFile.renameTo(runFile)
val in = new FileOutputStream(inFile)
- val out = new FileInputStream(outFile)
- val err = new FileInputStream(errFile)
+ val outErr = new FileInputStream(outErrFile)
+ val meta = new FileInputStream(metaFile)
+
if (!pidFile.exists()){
val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
@@ -72,16 +74,37 @@ object Client{
new java.lang.ProcessBuilder()
.command(l)
.redirectInput(inFile)
- .redirectOutput(outFile)
- .redirectError(errFile)
+ .redirectOutput(outErrFile)
+ .redirectError(outErrFile)
.start()
}
val buffer = new Array[Byte](1024)
+ val metaBuffer = new Array[Byte](1024)
while({
Thread.sleep(1)
while({
- forward(buffer, out, System.out) |
- forward(buffer, err, System.err) |
+ (if (outErr.available() > 0){
+ val outErrN = outErr.read(buffer)
+ if (outErrN > 0) {
+ var metaN = 0
+ while (metaN < outErrN) {
+ val delta = meta.read(metaBuffer, 0, outErrN - metaN)
+ if (delta > 0) {
+ var i = 0
+ while (i < delta) {
+ metaBuffer(i) match {
+ case 0 => System.out.write(buffer(metaN + i))
+ case 1 => System.err.write(buffer(metaN + i))
+ }
+ i += 1
+ }
+ metaN += delta
+ }
+ }
+ }
+
+ true
+ }else false) |
forward(buffer, System.in, in)
})()
@@ -89,8 +112,8 @@ object Client{
})()
inFile.delete()
- outFile.delete()
- errFile.delete()
+ outErrFile.delete()
+ metaFile.delete()
}
}
@@ -104,10 +127,13 @@ object Client{
}
}
-class ProxyOutputStream(x: => java.io.OutputStream) extends java.io.OutputStream {
- def write(b: Int) = x.write(b)
- override def write(b: Array[Byte], off: Int, len: Int) = x.write(b, off, len)
- override def write(b: Array[Byte]) = x.write(b)
+class ProxyOutputStream(x: => java.io.OutputStream,
+ meta: => java.io.OutputStream,
+ key: Int) extends java.io.OutputStream {
+ override def write(b: Int) = Server.synchronized{
+ x.write(b)
+ meta.write(key)
+ }
}
class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
def read() = x.read()
@@ -122,13 +148,13 @@ object Server{
var lastRun = System.currentTimeMillis()
val pidFile = lockBase.resolve("pid")
var currentIn = System.in
- var currentOut: OutputStream = System.out
- var currentErr: OutputStream = System.err
+ var currentOutErr: OutputStream = System.out
+ var currentMeta: OutputStream = System.err
val raf = new RandomAccessFile(lockBase + "/lock", "rw")
val channel = raf.getChannel
- System.setOut(new PrintStream(new ProxyOutputStream(currentOut), true))
- System.setErr(new PrintStream(new ProxyOutputStream(currentErr), true))
+ System.setOut(new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true))
+ System.setErr(new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true))
System.setIn(new ProxyInputStream(currentIn))
Files.createFile(pidFile)
var mainRunner = Option.empty[(Cli.Config, MainRunner)]
@@ -136,10 +162,9 @@ object Server{
while (System.currentTimeMillis() - lastRun < 60000) {
if (!Files.exists(runFile)) Thread.sleep(10)
else {
- val start = System.currentTimeMillis()
currentIn = Files.newInputStream(lockBase.resolve("stdin"))
- currentOut = Files.newOutputStream(lockBase.resolve("stdout"))
- currentErr = Files.newOutputStream(lockBase.resolve("stderr"))
+ currentOutErr = Files.newOutputStream(lockBase.resolve("stdouterr"))
+ currentMeta = Files.newOutputStream(lockBase.resolve("stdmeta"))
val args = new String(Files.readAllBytes(runFile)).split('\n')
try {
@@ -156,14 +181,12 @@ object Server{
}
}
)
- val end = System.currentTimeMillis()
mainRunner = mr
} catch{case MainRunner.WatchInterrupted(mr) =>
mainRunner = Some((mr.config, mr))
} finally{
- currentOut.flush()
- currentErr.flush()
+ currentOutErr.flush()
Files.delete(runFile)
lastRun = System.currentTimeMillis()
}