From 3e66c8630be00c01f2cb7720e988a94af338de18 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Mon, 19 Feb 2018 21:32:33 -0800 Subject: Successful management of hot evaluators in the mill.Server, client latency down to 600ms or so --- main/src/mill/Main.scala | 25 +++++--- main/src/mill/ServerClient.scala | 117 +++++++++++++++++++++++------------- main/src/mill/main/MainRunner.scala | 5 +- 3 files changed, 95 insertions(+), 52 deletions(-) diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala index 9f95c3c2..b281f573 100644 --- a/main/src/mill/Main.scala +++ b/main/src/mill/Main.scala @@ -1,8 +1,12 @@ package mill +import java.io.PrintStream + +import ammonite.main.Cli import ammonite.main.Cli.{formatBlock, genericSignature, replSignature} import ammonite.ops._ import ammonite.util.Util +import mill.main.MainRunner object Main { case class Config(home: ammonite.ops.Path = pwd/'out/'ammonite, @@ -12,11 +16,10 @@ object Main { watch: Boolean = false) def main(args: Array[String]): Unit = { - val result = main0(args) + val (result, _) = main0(args, None) System.exit(if(result) 0 else 1) } - def main0(args: Array[String]): Boolean = { - + def main0(args: Array[String], mainRunner: Option[(Cli.Config, MainRunner)]): (Boolean, Option[(Cli.Config, MainRunner)]) = { import ammonite.main.Cli val removed = Set("predef-code", "home", "no-home-predef") @@ -28,7 +31,7 @@ object Main { ) match{ case Left(msg) => System.err.println(msg) - false + (false, None) case Right((cliConfig, _)) if cliConfig.help => val leftMargin = millArgSignature.map(ammonite.main.Cli.showArg(_).length).max + 2 System.out.println( @@ -37,7 +40,7 @@ object Main { | |${formatBlock(millArgSignature, leftMargin).mkString(Util.newLine)}""".stripMargin ) - true + (true, None) case Right((cliConfig, leftoverArgs)) => val repl = leftoverArgs.isEmpty @@ -61,13 +64,19 @@ object Main { val runner = new mill.main.MainRunner( config.copy(home = pwd / "out" / ".ammonite"), - System.out, System.err, System.in + System.out, System.err, System.in, + mainRunner match{ + case Some((c, mr)) if c.copy(storageBackend = null) == cliConfig.copy(storageBackend = null) => + mr.lastEvaluator + case _ => None + } ) + if (repl){ runner.printInfo("Loading...") - runner.watchLoop(isRepl = true, printing = false, _.run()) + (runner.watchLoop(isRepl = true, printing = false, _.run()), Some(cliConfig -> runner)) } else { - runner.runScript(pwd / "build.sc", leftoverArgs) + (runner.runScript(pwd / "build.sc", leftoverArgs), Some(cliConfig -> runner)) } } } diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala index a1816c55..9fa8f70a 100644 --- a/main/src/mill/ServerClient.scala +++ b/main/src/mill/ServerClient.scala @@ -1,13 +1,15 @@ package mill -import java.io.{InputStream, OutputStream, PrintStream, RandomAccessFile} -import java.nio.file.{Files, Path, Paths} +import java.io._ import java.util + +import ammonite.main.Cli +import mill.main.MainRunner object Client{ - def WithLock[T](index: Int)(f: Path => T): T = { - val lockFile = Paths.get("out/mill-worker-" + index + "/lock") - Files.createDirectories(lockFile.getParent) - val raf = new RandomAccessFile(lockFile.toFile, "rw") + def WithLock[T](index: Int)(f: String => T): T = { + val lockBase = "out/mill-worker-" + index + new java.io.File(lockBase).mkdirs() + val raf = new RandomAccessFile(lockBase+ "/lock", "rw") val channel = raf.getChannel channel.tryLock() match{ case null => @@ -16,7 +18,7 @@ object Client{ if (index < 5) WithLock(index + 1)(f) else throw new Exception("Reached max process limit: " + 5) case locked => - try f(lockFile.getParent) + try f(lockBase) finally{ locked.release() raf.close() @@ -27,22 +29,31 @@ object Client{ def main(args: Array[String]): Unit = { WithLock(1) { lockBase => - val inFile = lockBase.resolve("stdin") - val outFile = lockBase.resolve("stdout") - val errFile = lockBase.resolve("stderr") - val runFile = lockBase.resolve("run") - val tmpRunFile = lockBase.resolve("run-tmp") - - Files.createFile(outFile) - Files.createFile(errFile) - Files.write(tmpRunFile, java.util.Arrays.asList(args:_*)) - Files.move(tmpRunFile, runFile) val start = System.currentTimeMillis() - val in = Files.newOutputStream(inFile) - val out = Files.newInputStream(outFile) - val err = Files.newInputStream(errFile) - if (!Files.exists(lockBase.resolve("pid"))){ + val inFile = new java.io.File(lockBase + "/stdin") + val outFile = new java.io.File(lockBase + "/stdout") + val errFile = new java.io.File(lockBase + "/stderr") + val runFile = new java.io.File(lockBase + "/run") + val tmpRunFile = new java.io.File(lockBase + "/run-tmp") + val pidFile = new java.io.File(lockBase + "/pid") + + outFile.createNewFile() + errFile.createNewFile() + + val f = new FileOutputStream(tmpRunFile) + var i = 0 + while (i < args.length){ + f.write(args(i).getBytes) + f.write('\n') + i += 1 + } + tmpRunFile.renameTo(runFile) + + val in = new FileOutputStream(inFile) + val out = new FileInputStream(outFile) + val err = new FileInputStream(errFile) + if (!pidFile.exists()){ val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath val l = new java.util.ArrayList[String] @@ -60,26 +71,26 @@ object Client{ new java.lang.ProcessBuilder() .command(l) - .redirectInput(inFile.toFile) - .redirectOutput(outFile.toFile) - .redirectError(errFile.toFile) + .redirectInput(inFile) + .redirectOutput(outFile) + .redirectError(errFile) .start() } - - val buffer = new Array[Byte](1024) while({ Thread.sleep(1) - forward(buffer, out, System.out) | - forward(buffer, err, System.err) | - forward(buffer, System.in, in) | - Files.exists(runFile) + while({ + forward(buffer, out, System.out) | + forward(buffer, err, System.err) | + forward(buffer, System.in, in) + })() + + runFile.exists() })() - println("DELTA: " + (System.currentTimeMillis() - start)) - Files.delete(inFile) - Files.delete(outFile) - Files.delete(errFile) + inFile.delete() + outFile.delete() + errFile.delete() } } @@ -93,27 +104,49 @@ object Client{ } } - +class ProxyOutputStream(x: => java.io.OutputStream) extends java.io.OutputStream { + def write(b: Int) = x.write(b) + override def write(b: Array[Byte], off: Int, len: Int) = x.write(b, off, len) + override def write(b: Array[Byte]) = x.write(b) +} +class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ + def read() = x.read() + override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) + override def read(b: Array[Byte]) = x.read(b) +} object Server{ def main(args: Array[String]): Unit = { + import java.nio.file.{Paths, Files} val lockBase = Paths.get(args(0)) val runFile = lockBase.resolve("run") var lastRun = System.currentTimeMillis() val pidFile = lockBase.resolve("pid") + var currentIn = System.in + var currentOut: OutputStream = System.out + var currentErr: OutputStream = System.err + System.setOut(new PrintStream(new ProxyOutputStream(currentOut), true)) + System.setErr(new PrintStream(new ProxyOutputStream(currentErr), true)) + System.setIn(new ProxyInputStream(currentIn)) Files.createFile(pidFile) + var mainRunner = Option.empty[(Cli.Config, MainRunner)] try { while (System.currentTimeMillis() - lastRun < 60000) { if (!Files.exists(runFile)) Thread.sleep(10) else { - val in = Files.newInputStream(lockBase.resolve("stdin")) - val out = Files.newOutputStream(lockBase.resolve("stdout")) - val err = Files.newOutputStream(lockBase.resolve("stderr")) + val start = System.currentTimeMillis() + currentIn = Files.newInputStream(lockBase.resolve("stdin")) + currentOut = Files.newOutputStream(lockBase.resolve("stdout")) + currentErr = Files.newOutputStream(lockBase.resolve("stderr")) val args = new String(Files.readAllBytes(runFile)).split('\n') try { - System.setOut(new PrintStream(out)) - System.setErr(new PrintStream(err)) - System.setIn(in) - mill.Main.main0(args) + + val (_, mr) = mill.Main.main0(args, mainRunner) + val end = System.currentTimeMillis() + + mainRunner = mr + System.out.flush() + System.err.flush() + pprint.log(end - start) } finally { Files.delete(runFile) lastRun = System.currentTimeMillis() diff --git a/main/src/mill/main/MainRunner.scala b/main/src/mill/main/MainRunner.scala index c4d50d46..7087dbc7 100644 --- a/main/src/mill/main/MainRunner.scala +++ b/main/src/mill/main/MainRunner.scala @@ -19,13 +19,14 @@ import upickle.Js class MainRunner(config: ammonite.main.Cli.Config, outprintStream: PrintStream, errPrintStream: PrintStream, - stdIn: InputStream) + stdIn: InputStream, + var lastEvaluator: Option[(Seq[(Path, Long)], Evaluator[Any])] = None) extends ammonite.MainRunner( config, outprintStream, errPrintStream, stdIn, outprintStream, errPrintStream ){ - var lastEvaluator: Option[(Seq[(Path, Long)], Evaluator[Any])] = None + override def runScript(scriptPath: Path, scriptArgs: List[String]) = watchLoop( -- cgit v1.2.3