From a7cb99f1bce04366f688d36bc9faef30161da8e7 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sat, 19 May 2018 09:37:25 -0700 Subject: WIP keep mill server alive if you Ctrl-C during --watch (#339) * wip * Clean up more resources in the Mill client after every command * catch and ignore SIGINT in Mill server to make it survive Ctrl-C on the client --- main/src/mill/Main.scala | 127 ----------------- main/src/mill/MillMain.scala | 130 +++++++++++++++++ main/src/mill/main/MainRunner.scala | 6 +- main/src/mill/main/MillServerMain.scala | 242 ++++++++++++++++++++++++++++++++ main/src/mill/main/Server.scala | 216 ---------------------------- 5 files changed, 376 insertions(+), 345 deletions(-) delete mode 100644 main/src/mill/Main.scala create mode 100644 main/src/mill/MillMain.scala create mode 100644 main/src/mill/main/MillServerMain.scala delete mode 100644 main/src/mill/main/Server.scala (limited to 'main/src') diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala deleted file mode 100644 index 5573a325..00000000 --- a/main/src/mill/Main.scala +++ /dev/null @@ -1,127 +0,0 @@ -package mill - -import java.io.{InputStream, PrintStream} - -import scala.collection.JavaConverters._ -import ammonite.main.Cli._ -import ammonite.ops._ -import io.github.retronym.java9rtexport.Export -import mill.eval.Evaluator -import mill.util.DummyInputStream - -object Main { - - def main(args: Array[String]): Unit = { - val as = args match { - case Array(s, _*) if s == "-i" || s == "--interactive" => args.tail - case _ => args - } - val (result, _) = main0( - as, - None, - ammonite.Main.isInteractive(), - System.in, - System.out, - System.err, - System.getenv().asScala.toMap - ) - System.exit(if(result) 0 else 1) - } - - def main0(args: Array[String], - stateCache: Option[Evaluator.State], - mainInteractive: Boolean, - stdin: InputStream, - stdout: PrintStream, - stderr: PrintStream, - env: Map[String, String]): (Boolean, Option[Evaluator.State]) = { - import ammonite.main.Cli - - val removed = Set("predef-code", "no-home-predef") - var interactive = false - val interactiveSignature = Arg[Config, Unit]( - "interactive", Some('i'), - "Run Mill in interactive mode, suitable for opening REPLs and taking user input", - (c, v) =>{ - interactive = true - c - } - ) - val millArgSignature = - Cli.genericSignature.filter(a => !removed(a.name)) :+ interactiveSignature - - val millHome = mill.util.Ctx.defaultHome - - Cli.groupArgs( - args.toList, - millArgSignature, - Cli.Config(home = millHome, remoteLogging = false) - ) match{ - case _ if interactive => - stderr.println("-i/--interactive must be passed in as the first argument") - (false, None) - case Left(msg) => - stderr.println(msg) - (false, None) - case Right((cliConfig, _)) if cliConfig.help => - val leftMargin = millArgSignature.map(ammonite.main.Cli.showArg(_).length).max + 2 - stdout.println( - s"""Mill Build Tool - |usage: mill [mill-options] [target [target-options]] - | - |${formatBlock(millArgSignature, leftMargin).mkString(ammonite.util.Util.newLine)}""".stripMargin - ) - (true, None) - case Right((cliConfig, leftoverArgs)) => - - val repl = leftoverArgs.isEmpty - if (repl && stdin == DummyInputStream) { - stderr.println("Build repl needs to be run with the -i/--interactive flag") - (false, stateCache) - }else{ - val tqs = "\"\"\"" - val config = - if(!repl) cliConfig - else cliConfig.copy( - predefCode = - s"""import $$file.build, build._ - |implicit val replApplyHandler = mill.main.ReplApplyHandler( - | ammonite.ops.Path($tqs${cliConfig.home.toIO.getCanonicalPath.replaceAllLiterally("$", "$$")}$tqs), - | interp.colors(), - | repl.pprinter(), - | build.millSelf.get, - | build.millDiscover - |) - |repl.pprinter() = replApplyHandler.pprinter - |import replApplyHandler.generatedEval._ - | - """.stripMargin, - welcomeBanner = None - ) - - val runner = new mill.main.MainRunner( - config.copy(colored = Some(mainInteractive)), - stdout, stderr, stdin, - stateCache, - env - ) - - if (mill.main.client.Util.isJava9OrAbove) { - val rt = cliConfig.home / Export.rtJarName - if (!exists(rt)) { - runner.printInfo(s"Preparing Java ${System.getProperty("java.version")} runtime; this may take a minute or two ...") - Export.rtTo(rt.toIO, false) - } - } - - if (repl){ - runner.printInfo("Loading...") - (runner.watchLoop(isRepl = true, printing = false, _.run()), runner.stateCache) - } else { - (runner.runScript(pwd / "build.sc", leftoverArgs), runner.stateCache) - } - } - - } - } -} diff --git a/main/src/mill/MillMain.scala b/main/src/mill/MillMain.scala new file mode 100644 index 00000000..ad1bd39d --- /dev/null +++ b/main/src/mill/MillMain.scala @@ -0,0 +1,130 @@ +package mill + +import java.io.{InputStream, PrintStream} + +import scala.collection.JavaConverters._ +import ammonite.main.Cli._ +import ammonite.ops._ +import io.github.retronym.java9rtexport.Export +import mill.eval.Evaluator +import mill.util.DummyInputStream + +object MillMain { + + def main(args: Array[String]): Unit = { + val as = args match { + case Array(s, _*) if s == "-i" || s == "--interactive" => args.tail + case _ => args + } + val (result, _) = main0( + as, + None, + ammonite.Main.isInteractive(), + System.in, + System.out, + System.err, + System.getenv().asScala.toMap, + b => () + ) + System.exit(if(result) 0 else 1) + } + + def main0(args: Array[String], + stateCache: Option[Evaluator.State], + mainInteractive: Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream, + env: Map[String, String], + setIdle: Boolean => Unit): (Boolean, Option[Evaluator.State]) = { + import ammonite.main.Cli + + val removed = Set("predef-code", "no-home-predef") + var interactive = false + val interactiveSignature = Arg[Config, Unit]( + "interactive", Some('i'), + "Run Mill in interactive mode, suitable for opening REPLs and taking user input", + (c, v) =>{ + interactive = true + c + } + ) + val millArgSignature = + Cli.genericSignature.filter(a => !removed(a.name)) :+ interactiveSignature + + val millHome = mill.util.Ctx.defaultHome + + Cli.groupArgs( + args.toList, + millArgSignature, + Cli.Config(home = millHome, remoteLogging = false) + ) match{ + case _ if interactive => + stderr.println("-i/--interactive must be passed in as the first argument") + (false, None) + case Left(msg) => + stderr.println(msg) + (false, None) + case Right((cliConfig, _)) if cliConfig.help => + val leftMargin = millArgSignature.map(ammonite.main.Cli.showArg(_).length).max + 2 + stdout.println( + s"""Mill Build Tool + |usage: mill [mill-options] [target [target-options]] + | + |${formatBlock(millArgSignature, leftMargin).mkString(ammonite.util.Util.newLine)}""".stripMargin + ) + (true, None) + case Right((cliConfig, leftoverArgs)) => + + val repl = leftoverArgs.isEmpty + if (repl && stdin == DummyInputStream) { + stderr.println("Build repl needs to be run with the -i/--interactive flag") + (false, stateCache) + }else{ + val tqs = "\"\"\"" + val config = + if(!repl) cliConfig + else cliConfig.copy( + predefCode = + s"""import $$file.build, build._ + |implicit val replApplyHandler = mill.main.ReplApplyHandler( + | ammonite.ops.Path($tqs${cliConfig.home.toIO.getCanonicalPath.replaceAllLiterally("$", "$$")}$tqs), + | interp.colors(), + | repl.pprinter(), + | build.millSelf.get, + | build.millDiscover + |) + |repl.pprinter() = replApplyHandler.pprinter + |import replApplyHandler.generatedEval._ + | + """.stripMargin, + welcomeBanner = None + ) + + val runner = new mill.main.MainRunner( + config.copy(colored = Some(mainInteractive)), + stdout, stderr, stdin, + stateCache, + env, + setIdle + ) + + if (mill.main.client.Util.isJava9OrAbove) { + val rt = cliConfig.home / Export.rtJarName + if (!exists(rt)) { + runner.printInfo(s"Preparing Java ${System.getProperty("java.version")} runtime; this may take a minute or two ...") + Export.rtTo(rt.toIO, false) + } + } + + if (repl){ + runner.printInfo("Loading...") + (runner.watchLoop(isRepl = true, printing = false, _.run()), runner.stateCache) + } else { + (runner.runScript(pwd / "build.sc", leftoverArgs), runner.stateCache) + } + } + + } + } +} diff --git a/main/src/mill/main/MainRunner.scala b/main/src/mill/main/MainRunner.scala index a289db5f..bf0d5901 100644 --- a/main/src/mill/main/MainRunner.scala +++ b/main/src/mill/main/MainRunner.scala @@ -22,7 +22,8 @@ class MainRunner(val config: ammonite.main.Cli.Config, errPrintStream: PrintStream, stdIn: InputStream, stateCache0: Option[Evaluator.State] = None, - env : Map[String, String]) + env : Map[String, String], + setIdle: Boolean => Unit) extends ammonite.MainRunner( config, outprintStream, errPrintStream, stdIn, outprintStream, errPrintStream @@ -35,8 +36,9 @@ class MainRunner(val config: ammonite.main.Cli.Config, def statAll() = watched.forall{ case (file, lastMTime) => Interpreter.pathSignature(file) == lastMTime } - + setIdle(true) while(statAll()) Thread.sleep(100) + setIdle(false) } /** diff --git a/main/src/mill/main/MillServerMain.scala b/main/src/mill/main/MillServerMain.scala new file mode 100644 index 00000000..092d958c --- /dev/null +++ b/main/src/mill/main/MillServerMain.scala @@ -0,0 +1,242 @@ +package mill.main + +import java.io._ +import java.net.Socket + +import mill.MillMain + +import scala.collection.JavaConverters._ +import org.scalasbt.ipcsocket._ +import mill.main.client._ +import mill.eval.Evaluator +import mill.util.DummyInputStream +import sun.misc.{Signal, SignalHandler} + +trait MillServerMain[T]{ + var stateCache = Option.empty[T] + def main0(args: Array[String], + stateCache: Option[T], + mainInteractive: Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream, + env : Map[String, String], + setIdle: Boolean => Unit): (Boolean, Option[T]) +} + +object MillServerMain extends mill.main.MillServerMain[Evaluator.State]{ + def main(args0: Array[String]): Unit = { + // Disable SIGINT interrupt signal in the Mill server. + // + // This gets passed through from the client to server whenever the user + // hits `Ctrl-C`, which by default kills the server, which defeats the purpose + // of running a background server. Furthermore, the background server already + // can detect when the Mill client goes away, which is necessary to handle + // the case when a Mill client that did *not* spawn the server gets `CTRL-C`ed + Signal.handle(new Signal("INT"), new SignalHandler () { + def handle(sig: Signal) = {} // do nothing + }) + new Server( + args0(0), + this, + () => System.exit(0), + 300000, + mill.main.client.Locks.files(args0(0)) + ).run() + } + def main0(args: Array[String], + stateCache: Option[Evaluator.State], + mainInteractive: Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream, + env : Map[String, String], + setIdle: Boolean => Unit) = { + MillMain.main0( + args, + stateCache, + mainInteractive, + DummyInputStream, + stdout, + stderr, + env, + setIdle + ) + } +} + + +class Server[T](lockBase: String, + sm: MillServerMain[T], + interruptServer: () => Unit, + acceptTimeout: Int, + locks: Locks) { + + val originalStdout = System.out + def run() = { + Server.tryLockBlock(locks.processLock){ + var running = true + while (running) { + Server.lockBlock(locks.serverLock){ + val (serverSocket, socketClose) = if (Util.isWindows) { + val socketName = Util.WIN32_PIPE_PREFIX + new File(lockBase).getName + (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close()) + } else { + val socketName = lockBase + "/io" + new File(socketName).delete() + (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close()) + } + + val sockOpt = Server.interruptWith( + "MillSocketTimeoutInterruptThread", + acceptTimeout, + socketClose(), + serverSocket.accept() + ) + + sockOpt match{ + case None => running = false + case Some(sock) => + try { + handleRun(sock) + serverSocket.close() + } + catch{case e: Throwable => e.printStackTrace(originalStdout) } + } + } + // Make sure you give an opportunity for the client to probe the lock + // and realize the server has released it to signal completion + Thread.sleep(10) + } + }.getOrElse(throw new Exception("PID already present")) + } + + def handleRun(clientSocket: Socket) = { + + val currentOutErr = clientSocket.getOutputStream + val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) + val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) + val socketIn = clientSocket.getInputStream + val argStream = new FileInputStream(lockBase + "/run") + val interactive = argStream.read() != 0 + val clientMillVersion = Util.readString(argStream) + val serverMillVersion = sys.props("MILL_VERSION") + if (clientMillVersion != serverMillVersion) { + stdout.println(s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server") + System.exit(0) + } + val args = Util.parseArgs(argStream) + val env = Util.parseMap(argStream) + argStream.close() + + @volatile var done = false + @volatile var idle = false + val t = new Thread(() => + + try { + val (result, newStateCache) = sm.main0( + args, + sm.stateCache, + interactive, + socketIn, + stdout, + stderr, + env.asScala.toMap, + idle = _ + ) + + sm.stateCache = newStateCache + java.nio.file.Files.write( + java.nio.file.Paths.get(lockBase + "/exitCode"), + (if (result) 0 else 1).toString.getBytes + ) + } catch{case WatchInterrupted(sc: Option[T]) => + sm.stateCache = sc + } finally{ + done = true + idle = true + }, + "MillServerActionRunner" + ) + t.start() + // We cannot simply use Lock#await here, because the filesystem doesn't + // realize the clientLock/serverLock are held by different threads in the + // two processes and gives a spurious deadlock error + while(!done && !locks.clientLock.probe()) Thread.sleep(3) + + if (!idle) interruptServer() + + + t.interrupt() + t.stop() + + if (Util.isWindows) { + // Closing Win32NamedPipeSocket can often take ~5s + // It seems OK to exit the client early and subsequently + // start up mill client again (perhaps closing the server + // socket helps speed up the process). + val t = new Thread(() => clientSocket.close()) + t.setDaemon(true) + t.start() + } else clientSocket.close() + } +} +object Server{ + def lockBlock[T](lock: Lock)(t: => T): T = { + val l = lock.lock() + try t + finally l.release() + } + def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = { + lock.tryLock() match{ + case null => None + case l => + try Some(t) + finally l.release() + } + + } + def interruptWith[T](threadName: String, millis: Int, close: => Unit, t: => T): Option[T] = { + @volatile var interrupt = true + @volatile var interrupted = false + val thread = new Thread( + () => { + try Thread.sleep(millis) + catch{ case t: InterruptedException => /* Do Nothing */ } + if (interrupt) { + interrupted = true + close + } + }, + threadName + ) + + thread.start() + try { + val res = + try Some(t) + catch {case e: Throwable => None} + + if (interrupted) None + else res + + } finally { + thread.interrupt() + interrupt = false + } + } +} + +class ProxyOutputStream(x: => java.io.OutputStream, + key: Int) extends java.io.OutputStream { + override def write(b: Int) = x.synchronized{ + x.write(key) + x.write(b) + } +} +class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ + def read() = x.read() + override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) + override def read(b: Array[Byte]) = x.read(b) +} +case class WatchInterrupted[T](stateCache: Option[T]) extends Exception diff --git a/main/src/mill/main/Server.scala b/main/src/mill/main/Server.scala deleted file mode 100644 index 07703bed..00000000 --- a/main/src/mill/main/Server.scala +++ /dev/null @@ -1,216 +0,0 @@ -package mill.main - -import java.io._ -import java.net.Socket - -import mill.Main -import scala.collection.JavaConverters._ -import org.scalasbt.ipcsocket._ -import mill.main.client._ -import mill.eval.Evaluator -import mill.util.DummyInputStream - -trait ServerMain[T]{ - var stateCache = Option.empty[T] - def main0(args: Array[String], - stateCache: Option[T], - mainInteractive: Boolean, - stdin: InputStream, - stdout: PrintStream, - stderr: PrintStream, - env : Map[String, String]): (Boolean, Option[T]) -} - -object ServerMain extends mill.main.ServerMain[Evaluator.State]{ - def main(args0: Array[String]): Unit = { - new Server( - args0(0), - this, - () => System.exit(0), - 300000, - mill.main.client.Locks.files(args0(0)) - ).run() - } - def main0(args: Array[String], - stateCache: Option[Evaluator.State], - mainInteractive: Boolean, - stdin: InputStream, - stdout: PrintStream, - stderr: PrintStream, - env : Map[String, String]) = Main.main0( - args, - stateCache, - mainInteractive, - DummyInputStream, - stdout, - stderr, - env - ) -} - - -class Server[T](lockBase: String, - sm: ServerMain[T], - interruptServer: () => Unit, - acceptTimeout: Int, - locks: Locks) { - - val originalStdout = System.out - def run() = { - Server.tryLockBlock(locks.processLock){ - var running = true - while (running) { - Server.lockBlock(locks.serverLock){ - val (serverSocket, socketClose) = if (Util.isWindows) { - val socketName = Util.WIN32_PIPE_PREFIX + new File(lockBase).getName - (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close()) - } else { - val socketName = lockBase + "/io" - new File(socketName).delete() - (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close()) - } - - val sockOpt = Server.interruptWith( - acceptTimeout, - socketClose(), - serverSocket.accept() - ) - - sockOpt match{ - case None => running = false - case Some(sock) => - try { - handleRun(sock) - serverSocket.close() - } - catch{case e: Throwable => e.printStackTrace(originalStdout) } - } - } - // Make sure you give an opportunity for the client to probe the lock - // and realize the server has released it to signal completion - Thread.sleep(10) - } - }.getOrElse(throw new Exception("PID already present")) - } - - def handleRun(clientSocket: Socket) = { - - val currentOutErr = clientSocket.getOutputStream - val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) - val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) - val socketIn = clientSocket.getInputStream - val argStream = new FileInputStream(lockBase + "/run") - val interactive = argStream.read() != 0 - val clientMillVersion = Util.readString(argStream) - val serverMillVersion = sys.props("MILL_VERSION") - if (clientMillVersion != serverMillVersion) { - stdout.println(s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server") - System.exit(0) - } - val args = Util.parseArgs(argStream) - val env = Util.parseMap(argStream) - argStream.close() - - var done = false - val t = new Thread(() => - - try { - val (result, newStateCache) = sm.main0( - args, - sm.stateCache, - interactive, - socketIn, - stdout, - stderr, - env.asScala.toMap - ) - - sm.stateCache = newStateCache - java.nio.file.Files.write( - java.nio.file.Paths.get(lockBase + "/exitCode"), - (if (result) 0 else 1).toString.getBytes - ) - } catch{case WatchInterrupted(sc: Option[T]) => - sm.stateCache = sc - } finally{ - done = true - } - ) - - t.start() - // We cannot simply use Lock#await here, because the filesystem doesn't - // realize the clientLock/serverLock are held by different threads in the - // two processes and gives a spurious deadlock error - while(!done && !locks.clientLock.probe()) { - Thread.sleep(3) - } - - if (!done) interruptServer() - - t.interrupt() - t.stop() - - if (Util.isWindows) { - // Closing Win32NamedPipeSocket can often take ~5s - // It seems OK to exit the client early and subsequently - // start up mill client again (perhaps closing the server - // socket helps speed up the process). - val t = new Thread(() => clientSocket.close()) - t.setDaemon(true) - t.start() - } else clientSocket.close() - } -} -object Server{ - def lockBlock[T](lock: Lock)(t: => T): T = { - val l = lock.lock() - try t - finally l.release() - } - def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = { - lock.tryLock() match{ - case null => None - case l => - try Some(t) - finally l.release() - } - - } - def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = { - @volatile var interrupt = true - @volatile var interrupted = false - new Thread(() => { - Thread.sleep(millis) - if (interrupt) { - interrupted = true - close - } - }).start() - - try { - val res = - try Some(t) - catch {case e: Throwable => None} - - if (interrupted) None - else res - - } finally { - interrupt = false - } - } -} - -class ProxyOutputStream(x: => java.io.OutputStream, - key: Int) extends java.io.OutputStream { - override def write(b: Int) = x.synchronized{ - x.write(key) - x.write(b) - } -} -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} -case class WatchInterrupted[T](stateCache: Option[T]) extends Exception -- cgit v1.2.3