From c98408adf2d96928fe227a740631a8efd8e0c339 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Wed, 21 Feb 2018 21:05:37 -0800 Subject: Clean up the provisional client-server code with unit tests and proper file-sockets Seems to work well enough for interactive scala consoles, though still not Ammonite Also Added ScalaModule#launcher and re-worked our build.sc file to use it --- main/src/mill/Main.scala | 72 ++++++- main/src/mill/ServerClient.scala | 312 ------------------------------ main/src/mill/main/MainModule.scala | 2 +- main/src/mill/main/MainRunner.scala | 10 +- main/src/mill/main/ReplApplyHandler.scala | 3 +- main/src/mill/modules/Jvm.scala | 52 ++++- 6 files changed, 129 insertions(+), 322 deletions(-) delete mode 100644 main/src/mill/ServerClient.scala (limited to 'main/src') diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala index d2d2f66f..ee055d64 100644 --- a/main/src/mill/Main.scala +++ b/main/src/mill/Main.scala @@ -2,12 +2,82 @@ package mill import java.io.{InputStream, OutputStream, PrintStream} - import ammonite.main.Cli.{formatBlock, genericSignature, replSignature} import ammonite.ops._ import ammonite.util.Util +import mill.clientserver.{Client, FileLocks} import mill.eval.Evaluator + +object ClientMain { + def initServer(lockBase: String) = { + val selfJars = new java.lang.StringBuilder + var current = getClass.getClassLoader + while(current != null){ + getClass.getClassLoader match{ + case e: java.net.URLClassLoader => + val urls = e.getURLs + var i = 0 + while(i < urls.length){ + if (selfJars.length() != 0) selfJars.append(':') + selfJars.append(urls(i)) + i += 1 + } + case _ => + } + current = current.getParent + } + + val l = new java.util.ArrayList[String] + l.add("java") + val props = System.getProperties + val keys = props.stringPropertyNames().iterator() + while(keys.hasNext){ + val k = keys.next() + if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)) + } + l.add("-cp") + l.add(selfJars.toString) + l.add("mill.ServerMain") + l.add(lockBase) + new java.lang.ProcessBuilder() + .command(l) + .redirectOutput(new java.io.File(lockBase + "/logs")) + .redirectError(new java.io.File(lockBase + "/logs")) + .start() + } + def main(args: Array[String]): Unit = { + Client.WithLock(1) { lockBase => + val c = new Client( + lockBase, + () => initServer(lockBase), + new FileLocks(lockBase), + System.in, + System.out, + System.err + ) + c.run(args) + } + System.exit(0) + } +} +object ServerMain extends mill.clientserver.ServerMain[Evaluator.State]{ + def main0(args: Array[String], + stateCache: Option[Evaluator.State], + mainInteractive: Boolean, + watchInterrupted: () => Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream) = Main.main0( + args, + stateCache, + mainInteractive, + watchInterrupted, + stdin, + stdout, + stderr + ) +} object Main { def main(args: Array[String]): Unit = { diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala deleted file mode 100644 index 383a8865..00000000 --- a/main/src/mill/ServerClient.scala +++ /dev/null @@ -1,312 +0,0 @@ -package mill - -import java.io._ -import java.nio.channels.FileChannel -import java.util - -import ammonite.main.Cli -import mill.eval.Evaluator -import mill.main.MainRunner - -class ServerClient(lockBase: String){ - val inFile = new java.io.File(lockBase + "/stdin") - val outErrFile = new java.io.File(lockBase + "/stdouterr") - val metaFile = new java.io.File(lockBase + "/stdmeta") - val logFile = new java.io.File(lockBase + "/log") - val runFile = new java.io.File(lockBase + "/run") - val tmpRunFile = new java.io.File(lockBase + "/run-tmp") - val pidFile = new java.io.File(lockBase + "/pid") -} -object Client{ - def WithLock[T](index: Int)(f: String => T): T = { - val lockBase = "out/mill-worker-" + index - new java.io.File(lockBase).mkdirs() - val lockFile = new RandomAccessFile(lockBase+ "/lock", "rw") - val channel = lockFile.getChannel - channel.tryLock() match{ - case null => - lockFile.close() - channel.close() - if (index < 5) WithLock(index + 1)(f) - else throw new Exception("Reached max process limit: " + 5) - case locked => - try f(lockBase) - finally{ - locked.release() - lockFile.close() - channel.close() - } - } - } - - def main(args: Array[String]): Unit = { - WithLock(1) { lockBase => - new Client(lockBase).run(args) - } - } -} - - -class Client(lockBase: String) extends ServerClient(lockBase){ - def run(args: Array[String]) = { - - outErrFile.delete() - metaFile.delete() - outErrFile.createNewFile() - metaFile.createNewFile() - inFile.createNewFile() - inFile.createNewFile() - logFile.createNewFile() - - val f = new FileOutputStream(tmpRunFile) - f.write(if (System.console() != null) 1 else 0) - f.write(args.length) - var i = 0 - while (i < args.length){ - f.write(args(i).length) - f.write(args(i).getBytes) - i += 1 - } - f.flush() - - tmpRunFile.renameTo(runFile) - val in = new FileOutputStream(inFile) - val outErr = new FileInputStream(outErrFile) - val meta = new FileInputStream(metaFile) - - val pidRaf = new RandomAccessFile(pidFile, "rw") - val pidLockChannel = pidRaf.getChannel - - if (!probeLock(pidLockChannel)){ - val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath - - val l = new java.util.ArrayList[String] - l.add("java") - val props = System.getProperties - val keys = props.stringPropertyNames().iterator() - while(keys.hasNext){ - val k = keys.next() - if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)) - } - l.add("-cp") - l.add(selfJar) - l.add("mill.Server") - l.add(lockBase.toString) - - new java.lang.ProcessBuilder() - .command(l) - .redirectInput(inFile) - .redirectOutput(logFile) - .redirectError(logFile) - .start() - } - - - while(!probeLock(pidLockChannel)) Thread.sleep(3) - - try { - val buffer = new Array[Byte](1024) - val metaBuffer = new Array[Byte](1024) - while ({ - Thread.sleep(3) - while ( { - forwardForked(buffer, metaBuffer, meta, outErr) | - forward(buffer, System.in, in) - }) () - - runFile.exists() && probeLock(pidLockChannel) - }) () - }finally { - pidLockChannel.close() - -// pidFile.delete() - inFile.delete() - outErrFile.delete() - metaFile.delete() - } - } - - def probeLock(pidLockChannel: FileChannel) = { - - pidLockChannel.tryLock() match{ - case null => true - case locked => - locked.release() - false - } - - } - def forwardForked(buffer: Array[Byte], - metaBuffer: Array[Byte], - meta: InputStream, - outErr: InputStream) = { - - if (outErr.available() > 0){ - val outErrN = outErr.read(buffer) - if (outErrN > 0) { - var metaN = 0 - while (metaN < outErrN) { - val delta = meta.read(metaBuffer, 0, outErrN - metaN) - if (delta > 0) { - var i = 0 - while (i < delta) { - metaBuffer(i) match { - case 0 => System.out.write(buffer(metaN + i)) - case 1 => System.err.write(buffer(metaN + i)) - } - i += 1 - } - metaN += delta - } - } - } - - true - }else false - } - def forward(buffer: Array[Byte], src: InputStream, dest: OutputStream) = { - if (src.available() != 0){ - val n = src.read(buffer) - dest.write(buffer, 0, n) - true - }else false - } -} - -class ProxyOutputStream(x: => java.io.OutputStream, - meta: => java.io.OutputStream, - key: Int) extends java.io.OutputStream { - override def write(b: Int) = Server.synchronized{ - x.write(b) - meta.write(key) - } -} -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} - -object Server{ - def main(args0: Array[String]): Unit = { - new Server(args0(0)).run() - - } -} - -class ProbeThread(lockChannel: FileChannel, - mainThread: Thread, - log: PrintStream) extends Runnable{ - var running = true - def run() = { - while({ - Thread.sleep(3) - lockChannel.tryLock() match{ - case null => true && running - case locked => - locked.release() - lockChannel.close() - System.exit(0) - false - } - })() - } -} - -class Server(lockBase: String) extends ServerClient(lockBase){ - var lastRun = System.currentTimeMillis() - var currentIn = System.in - var currentOutErr: OutputStream = System.out - var currentMeta: OutputStream = System.err - val lockFile = new RandomAccessFile(lockBase + "/lock", "rw") - val channel = lockFile.getChannel - var stateCache = Option.empty[Evaluator.State] - - def run() = { - val originalStdout = System.out - originalStdout.println("Initializing") - val pidRaf = new RandomAccessFile(pidFile, "rw") - val lockChannel = pidRaf.getChannel - val lock = lockChannel.tryLock() - if (lock == null) throw new Exception("PID already present") - originalStdout.println("Locked pid file") - try { - while (System.currentTimeMillis() - lastRun < 60000) { - pollOrRun(originalStdout) - originalStdout.println("Delta " + (System.currentTimeMillis() - lastRun)) - originalStdout.println("Threads " + Thread.activeCount()) - } - }finally{ - originalStdout.println("Exiting Server... " + System.currentTimeMillis()) - lock.release() - lockChannel.close() - pidRaf.close() - pidFile.delete() - } - originalStdout.println("END") - } - def pollOrRun(originalStdout: PrintStream) = { - if (!runFile.exists()) Thread.sleep(30) - else try{ - originalStdout.println("Handling Run") - handleRun(originalStdout) - }catch{ - case e: Throwable => - originalStdout.println("Run Failed") - e.printStackTrace(originalStdout) - }finally{ - lastRun = System.currentTimeMillis() - originalStdout.println("Updating lastRun " + lastRun) - } - } - def handleRun(originalStdout: PrintStream) = { - currentIn = new FileInputStream(inFile) - currentOutErr = new FileOutputStream(outErrFile) - currentMeta = new FileOutputStream(metaFile) - val lockChannel = lockFile.getChannel - val probe = new ProbeThread(lockChannel, Thread.currentThread(), originalStdout) - val probeThread = new Thread(probe) - probeThread.start() - val argStream = new FileInputStream(runFile) - val interactive = argStream.read() != 0 - val argsLength = argStream.read() - val args = Array.fill(argsLength){ - val n = argStream.read() - val arr = new Array[Byte](n) - argStream.read(arr) - new String(arr) - } - originalStdout.println("Parsed Args " + args.toList) - try { - originalStdout.println("Running Main") - val (_, newStateCache) = mill.Main.main0( - args, - stateCache, - interactive, - () => { - channel.tryLock() match{ - case null => - false - case lock => - lock.release() - true - } - }, - new ProxyInputStream(currentIn), - new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true), - new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true) - ) - originalStdout.println("Finished Main") - stateCache = newStateCache - } catch{case MainRunner.WatchInterrupted(sc) => - stateCache = sc - } finally{ -// lockChannel.close() -// lockFile.close() - probe.running = false - probeThread.join() - runFile.delete() - - } - } -} \ No newline at end of file diff --git a/main/src/mill/main/MainModule.scala b/main/src/mill/main/MainModule.scala index d686b1db..feafd35d 100644 --- a/main/src/mill/main/MainModule.scala +++ b/main/src/mill/main/MainModule.scala @@ -79,7 +79,7 @@ trait MainModule extends mill.Module{ // When using `show`, redirect all stdout of the evaluated tasks so the // printed JSON is the only thing printed to stdout. log = evaluator.log match{ - case PrintLogger(c1, c2, o, i, e) => PrintLogger(c1, c2, e, i, e) + case PrintLogger(c1, c2, o, i, e, in) => PrintLogger(c1, c2, e, i, e, in) case l => l } ), diff --git a/main/src/mill/main/MainRunner.scala b/main/src/mill/main/MainRunner.scala index 2bd7fbc6..907f8a7b 100644 --- a/main/src/mill/main/MainRunner.scala +++ b/main/src/mill/main/MainRunner.scala @@ -5,12 +5,9 @@ import ammonite.interp.{Interpreter, Preprocessor} import ammonite.ops.Path import ammonite.util._ import mill.eval.{Evaluator, PathRef} -import mill.main.MainRunner.WatchInterrupted + import mill.util.PrintLogger -object MainRunner{ - case class WatchInterrupted(stateCache: Option[Evaluator.State]) extends Exception -} /** * Customized version of [[ammonite.MainRunner]], allowing us to run Mill @@ -37,7 +34,7 @@ class MainRunner(val config: ammonite.main.Cli.Config, } while(statAll()) { - if (interruptWatch()) throw WatchInterrupted(stateCache) + if (interruptWatch()) throw mill.clientserver.WatchInterrupted(stateCache) Thread.sleep(100) } } @@ -58,7 +55,8 @@ class MainRunner(val config: ammonite.main.Cli.Config, colors, outprintStream, errPrintStream, - errPrintStream + errPrintStream, + stdIn ) ) diff --git a/main/src/mill/main/ReplApplyHandler.scala b/main/src/mill/main/ReplApplyHandler.scala index 5681d75b..a2b042ad 100644 --- a/main/src/mill/main/ReplApplyHandler.scala +++ b/main/src/mill/main/ReplApplyHandler.scala @@ -24,7 +24,8 @@ object ReplApplyHandler{ colors, System.out, System.err, - System.err + System.err, + System.in ) ) ) diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala index 297dcf1f..2d351c18 100644 --- a/main/src/mill/modules/Jvm.scala +++ b/main/src/mill/modules/Jvm.scala @@ -7,6 +7,7 @@ import java.nio.file.attribute.PosixFilePermission import java.util.jar.{JarEntry, JarFile, JarOutputStream} import ammonite.ops._ +import mill.clientserver.{ClientInputPumper, ClientServer} import mill.define.Task import mill.eval.PathRef import mill.util.{Ctx, Loose} @@ -27,6 +28,7 @@ object Jvm { envArgs: Map[String, String] = Map.empty, mainArgs: Seq[String] = Seq.empty, workingDir: Path = null): Unit = { + import ammonite.ops.ImplicitWd._ val commandArgs = Vector("java") ++ @@ -34,7 +36,32 @@ object Jvm { Vector("-cp", classPath.mkString(":"), mainClass) ++ mainArgs - %.copy(envArgs = envArgs)(commandArgs)(workingDir) + val builder = new java.lang.ProcessBuilder() + import collection.JavaConverters._ + for ((k, v) <- envArgs){ + if (v != null) builder.environment().put(k, v) + else builder.environment().remove(k) + } + builder.directory(workingDir.toIO) + + val process = + builder + .command(commandArgs:_*) + .start() + + val sources = Seq( + process.getInputStream -> System.out, + process.getErrorStream -> System.err, + System.in -> process.getOutputStream + ) + + for((std, dest) <- sources){ + new Thread(new ClientInputPumper(std, dest)).start() + } + + val exitCode = process.waitFor() + if (exitCode == 0) () + else throw InteractiveShelloutException() } def runLocal(mainClass: String, @@ -253,5 +280,28 @@ object Jvm { } PathRef(outputPath) } + def launcherShellScript(mainClass: String, + classPath: Agg[String], + jvmArgs: Seq[String]) = { + s"""#!/usr/bin/env sh + | + |exec java ${jvmArgs.mkString(" ")} $$JAVA_OPTS -cp "${classPath.mkString(":")}" $mainClass "$$@" + """.stripMargin + } + def createLauncher(mainClass: String, + classPath: Agg[Path], + jvmArgs: Seq[String]) + (implicit ctx: Ctx.Dest)= { + val outputPath = ctx.dest / "run" + + write(outputPath, launcherShellScript(mainClass, classPath.map(_.toString), jvmArgs)) + + val perms = java.nio.file.Files.getPosixFilePermissions(outputPath.toNIO) + perms.add(PosixFilePermission.GROUP_EXECUTE) + perms.add(PosixFilePermission.OWNER_EXECUTE) + perms.add(PosixFilePermission.OTHERS_EXECUTE) + java.nio.file.Files.setPosixFilePermissions(outputPath.toNIO, perms) + PathRef(outputPath) + } } -- cgit v1.2.3