summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-02-21 21:05:37 -0800
committerLi Haoyi <haoyi.sg@gmail.com>2018-02-24 17:13:03 -0800
commitc98408adf2d96928fe227a740631a8efd8e0c339 (patch)
tree5a36d9ee7d8ee6e1f7f9247cd7ddd31b194df5df /main
parent51db54d4f1deefb34b9d7f6581611ae166652493 (diff)
downloadmill-c98408adf2d96928fe227a740631a8efd8e0c339.tar.gz
mill-c98408adf2d96928fe227a740631a8efd8e0c339.tar.bz2
mill-c98408adf2d96928fe227a740631a8efd8e0c339.zip
Clean up the provisional client-server code with unit tests and proper file-sockets
Seems to work well enough for interactive scala consoles, though still not Ammonite Also Added ScalaModule#launcher and re-worked our build.sc file to use it
Diffstat (limited to 'main')
-rw-r--r--main/src/mill/Main.scala72
-rw-r--r--main/src/mill/ServerClient.scala312
-rw-r--r--main/src/mill/main/MainModule.scala2
-rw-r--r--main/src/mill/main/MainRunner.scala10
-rw-r--r--main/src/mill/main/ReplApplyHandler.scala3
-rw-r--r--main/src/mill/modules/Jvm.scala52
6 files changed, 129 insertions, 322 deletions
diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala
index d2d2f66f..ee055d64 100644
--- a/main/src/mill/Main.scala
+++ b/main/src/mill/Main.scala
@@ -2,12 +2,82 @@ package mill
import java.io.{InputStream, OutputStream, PrintStream}
-
import ammonite.main.Cli.{formatBlock, genericSignature, replSignature}
import ammonite.ops._
import ammonite.util.Util
+import mill.clientserver.{Client, FileLocks}
import mill.eval.Evaluator
+
+object ClientMain {
+ def initServer(lockBase: String) = {
+ val selfJars = new java.lang.StringBuilder
+ var current = getClass.getClassLoader
+ while(current != null){
+ getClass.getClassLoader match{
+ case e: java.net.URLClassLoader =>
+ val urls = e.getURLs
+ var i = 0
+ while(i < urls.length){
+ if (selfJars.length() != 0) selfJars.append(':')
+ selfJars.append(urls(i))
+ i += 1
+ }
+ case _ =>
+ }
+ current = current.getParent
+ }
+
+ val l = new java.util.ArrayList[String]
+ l.add("java")
+ val props = System.getProperties
+ val keys = props.stringPropertyNames().iterator()
+ while(keys.hasNext){
+ val k = keys.next()
+ if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k))
+ }
+ l.add("-cp")
+ l.add(selfJars.toString)
+ l.add("mill.ServerMain")
+ l.add(lockBase)
+ new java.lang.ProcessBuilder()
+ .command(l)
+ .redirectOutput(new java.io.File(lockBase + "/logs"))
+ .redirectError(new java.io.File(lockBase + "/logs"))
+ .start()
+ }
+ def main(args: Array[String]): Unit = {
+ Client.WithLock(1) { lockBase =>
+ val c = new Client(
+ lockBase,
+ () => initServer(lockBase),
+ new FileLocks(lockBase),
+ System.in,
+ System.out,
+ System.err
+ )
+ c.run(args)
+ }
+ System.exit(0)
+ }
+}
+object ServerMain extends mill.clientserver.ServerMain[Evaluator.State]{
+ def main0(args: Array[String],
+ stateCache: Option[Evaluator.State],
+ mainInteractive: Boolean,
+ watchInterrupted: () => Boolean,
+ stdin: InputStream,
+ stdout: PrintStream,
+ stderr: PrintStream) = Main.main0(
+ args,
+ stateCache,
+ mainInteractive,
+ watchInterrupted,
+ stdin,
+ stdout,
+ stderr
+ )
+}
object Main {
def main(args: Array[String]): Unit = {
diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala
deleted file mode 100644
index 383a8865..00000000
--- a/main/src/mill/ServerClient.scala
+++ /dev/null
@@ -1,312 +0,0 @@
-package mill
-
-import java.io._
-import java.nio.channels.FileChannel
-import java.util
-
-import ammonite.main.Cli
-import mill.eval.Evaluator
-import mill.main.MainRunner
-
-class ServerClient(lockBase: String){
- val inFile = new java.io.File(lockBase + "/stdin")
- val outErrFile = new java.io.File(lockBase + "/stdouterr")
- val metaFile = new java.io.File(lockBase + "/stdmeta")
- val logFile = new java.io.File(lockBase + "/log")
- val runFile = new java.io.File(lockBase + "/run")
- val tmpRunFile = new java.io.File(lockBase + "/run-tmp")
- val pidFile = new java.io.File(lockBase + "/pid")
-}
-object Client{
- def WithLock[T](index: Int)(f: String => T): T = {
- val lockBase = "out/mill-worker-" + index
- new java.io.File(lockBase).mkdirs()
- val lockFile = new RandomAccessFile(lockBase+ "/lock", "rw")
- val channel = lockFile.getChannel
- channel.tryLock() match{
- case null =>
- lockFile.close()
- channel.close()
- if (index < 5) WithLock(index + 1)(f)
- else throw new Exception("Reached max process limit: " + 5)
- case locked =>
- try f(lockBase)
- finally{
- locked.release()
- lockFile.close()
- channel.close()
- }
- }
- }
-
- def main(args: Array[String]): Unit = {
- WithLock(1) { lockBase =>
- new Client(lockBase).run(args)
- }
- }
-}
-
-
-class Client(lockBase: String) extends ServerClient(lockBase){
- def run(args: Array[String]) = {
-
- outErrFile.delete()
- metaFile.delete()
- outErrFile.createNewFile()
- metaFile.createNewFile()
- inFile.createNewFile()
- inFile.createNewFile()
- logFile.createNewFile()
-
- val f = new FileOutputStream(tmpRunFile)
- f.write(if (System.console() != null) 1 else 0)
- f.write(args.length)
- var i = 0
- while (i < args.length){
- f.write(args(i).length)
- f.write(args(i).getBytes)
- i += 1
- }
- f.flush()
-
- tmpRunFile.renameTo(runFile)
- val in = new FileOutputStream(inFile)
- val outErr = new FileInputStream(outErrFile)
- val meta = new FileInputStream(metaFile)
-
- val pidRaf = new RandomAccessFile(pidFile, "rw")
- val pidLockChannel = pidRaf.getChannel
-
- if (!probeLock(pidLockChannel)){
- val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
-
- val l = new java.util.ArrayList[String]
- l.add("java")
- val props = System.getProperties
- val keys = props.stringPropertyNames().iterator()
- while(keys.hasNext){
- val k = keys.next()
- if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k))
- }
- l.add("-cp")
- l.add(selfJar)
- l.add("mill.Server")
- l.add(lockBase.toString)
-
- new java.lang.ProcessBuilder()
- .command(l)
- .redirectInput(inFile)
- .redirectOutput(logFile)
- .redirectError(logFile)
- .start()
- }
-
-
- while(!probeLock(pidLockChannel)) Thread.sleep(3)
-
- try {
- val buffer = new Array[Byte](1024)
- val metaBuffer = new Array[Byte](1024)
- while ({
- Thread.sleep(3)
- while ( {
- forwardForked(buffer, metaBuffer, meta, outErr) |
- forward(buffer, System.in, in)
- }) ()
-
- runFile.exists() && probeLock(pidLockChannel)
- }) ()
- }finally {
- pidLockChannel.close()
-
-// pidFile.delete()
- inFile.delete()
- outErrFile.delete()
- metaFile.delete()
- }
- }
-
- def probeLock(pidLockChannel: FileChannel) = {
-
- pidLockChannel.tryLock() match{
- case null => true
- case locked =>
- locked.release()
- false
- }
-
- }
- def forwardForked(buffer: Array[Byte],
- metaBuffer: Array[Byte],
- meta: InputStream,
- outErr: InputStream) = {
-
- if (outErr.available() > 0){
- val outErrN = outErr.read(buffer)
- if (outErrN > 0) {
- var metaN = 0
- while (metaN < outErrN) {
- val delta = meta.read(metaBuffer, 0, outErrN - metaN)
- if (delta > 0) {
- var i = 0
- while (i < delta) {
- metaBuffer(i) match {
- case 0 => System.out.write(buffer(metaN + i))
- case 1 => System.err.write(buffer(metaN + i))
- }
- i += 1
- }
- metaN += delta
- }
- }
- }
-
- true
- }else false
- }
- def forward(buffer: Array[Byte], src: InputStream, dest: OutputStream) = {
- if (src.available() != 0){
- val n = src.read(buffer)
- dest.write(buffer, 0, n)
- true
- }else false
- }
-}
-
-class ProxyOutputStream(x: => java.io.OutputStream,
- meta: => java.io.OutputStream,
- key: Int) extends java.io.OutputStream {
- override def write(b: Int) = Server.synchronized{
- x.write(b)
- meta.write(key)
- }
-}
-class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
- def read() = x.read()
- override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
- override def read(b: Array[Byte]) = x.read(b)
-}
-
-object Server{
- def main(args0: Array[String]): Unit = {
- new Server(args0(0)).run()
-
- }
-}
-
-class ProbeThread(lockChannel: FileChannel,
- mainThread: Thread,
- log: PrintStream) extends Runnable{
- var running = true
- def run() = {
- while({
- Thread.sleep(3)
- lockChannel.tryLock() match{
- case null => true && running
- case locked =>
- locked.release()
- lockChannel.close()
- System.exit(0)
- false
- }
- })()
- }
-}
-
-class Server(lockBase: String) extends ServerClient(lockBase){
- var lastRun = System.currentTimeMillis()
- var currentIn = System.in
- var currentOutErr: OutputStream = System.out
- var currentMeta: OutputStream = System.err
- val lockFile = new RandomAccessFile(lockBase + "/lock", "rw")
- val channel = lockFile.getChannel
- var stateCache = Option.empty[Evaluator.State]
-
- def run() = {
- val originalStdout = System.out
- originalStdout.println("Initializing")
- val pidRaf = new RandomAccessFile(pidFile, "rw")
- val lockChannel = pidRaf.getChannel
- val lock = lockChannel.tryLock()
- if (lock == null) throw new Exception("PID already present")
- originalStdout.println("Locked pid file")
- try {
- while (System.currentTimeMillis() - lastRun < 60000) {
- pollOrRun(originalStdout)
- originalStdout.println("Delta " + (System.currentTimeMillis() - lastRun))
- originalStdout.println("Threads " + Thread.activeCount())
- }
- }finally{
- originalStdout.println("Exiting Server... " + System.currentTimeMillis())
- lock.release()
- lockChannel.close()
- pidRaf.close()
- pidFile.delete()
- }
- originalStdout.println("END")
- }
- def pollOrRun(originalStdout: PrintStream) = {
- if (!runFile.exists()) Thread.sleep(30)
- else try{
- originalStdout.println("Handling Run")
- handleRun(originalStdout)
- }catch{
- case e: Throwable =>
- originalStdout.println("Run Failed")
- e.printStackTrace(originalStdout)
- }finally{
- lastRun = System.currentTimeMillis()
- originalStdout.println("Updating lastRun " + lastRun)
- }
- }
- def handleRun(originalStdout: PrintStream) = {
- currentIn = new FileInputStream(inFile)
- currentOutErr = new FileOutputStream(outErrFile)
- currentMeta = new FileOutputStream(metaFile)
- val lockChannel = lockFile.getChannel
- val probe = new ProbeThread(lockChannel, Thread.currentThread(), originalStdout)
- val probeThread = new Thread(probe)
- probeThread.start()
- val argStream = new FileInputStream(runFile)
- val interactive = argStream.read() != 0
- val argsLength = argStream.read()
- val args = Array.fill(argsLength){
- val n = argStream.read()
- val arr = new Array[Byte](n)
- argStream.read(arr)
- new String(arr)
- }
- originalStdout.println("Parsed Args " + args.toList)
- try {
- originalStdout.println("Running Main")
- val (_, newStateCache) = mill.Main.main0(
- args,
- stateCache,
- interactive,
- () => {
- channel.tryLock() match{
- case null =>
- false
- case lock =>
- lock.release()
- true
- }
- },
- new ProxyInputStream(currentIn),
- new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true),
- new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true)
- )
- originalStdout.println("Finished Main")
- stateCache = newStateCache
- } catch{case MainRunner.WatchInterrupted(sc) =>
- stateCache = sc
- } finally{
-// lockChannel.close()
-// lockFile.close()
- probe.running = false
- probeThread.join()
- runFile.delete()
-
- }
- }
-} \ No newline at end of file
diff --git a/main/src/mill/main/MainModule.scala b/main/src/mill/main/MainModule.scala
index d686b1db..feafd35d 100644
--- a/main/src/mill/main/MainModule.scala
+++ b/main/src/mill/main/MainModule.scala
@@ -79,7 +79,7 @@ trait MainModule extends mill.Module{
// When using `show`, redirect all stdout of the evaluated tasks so the
// printed JSON is the only thing printed to stdout.
log = evaluator.log match{
- case PrintLogger(c1, c2, o, i, e) => PrintLogger(c1, c2, e, i, e)
+ case PrintLogger(c1, c2, o, i, e, in) => PrintLogger(c1, c2, e, i, e, in)
case l => l
}
),
diff --git a/main/src/mill/main/MainRunner.scala b/main/src/mill/main/MainRunner.scala
index 2bd7fbc6..907f8a7b 100644
--- a/main/src/mill/main/MainRunner.scala
+++ b/main/src/mill/main/MainRunner.scala
@@ -5,12 +5,9 @@ import ammonite.interp.{Interpreter, Preprocessor}
import ammonite.ops.Path
import ammonite.util._
import mill.eval.{Evaluator, PathRef}
-import mill.main.MainRunner.WatchInterrupted
+
import mill.util.PrintLogger
-object MainRunner{
- case class WatchInterrupted(stateCache: Option[Evaluator.State]) extends Exception
-}
/**
* Customized version of [[ammonite.MainRunner]], allowing us to run Mill
@@ -37,7 +34,7 @@ class MainRunner(val config: ammonite.main.Cli.Config,
}
while(statAll()) {
- if (interruptWatch()) throw WatchInterrupted(stateCache)
+ if (interruptWatch()) throw mill.clientserver.WatchInterrupted(stateCache)
Thread.sleep(100)
}
}
@@ -58,7 +55,8 @@ class MainRunner(val config: ammonite.main.Cli.Config,
colors,
outprintStream,
errPrintStream,
- errPrintStream
+ errPrintStream,
+ stdIn
)
)
diff --git a/main/src/mill/main/ReplApplyHandler.scala b/main/src/mill/main/ReplApplyHandler.scala
index 5681d75b..a2b042ad 100644
--- a/main/src/mill/main/ReplApplyHandler.scala
+++ b/main/src/mill/main/ReplApplyHandler.scala
@@ -24,7 +24,8 @@ object ReplApplyHandler{
colors,
System.out,
System.err,
- System.err
+ System.err,
+ System.in
)
)
)
diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala
index 297dcf1f..2d351c18 100644
--- a/main/src/mill/modules/Jvm.scala
+++ b/main/src/mill/modules/Jvm.scala
@@ -7,6 +7,7 @@ import java.nio.file.attribute.PosixFilePermission
import java.util.jar.{JarEntry, JarFile, JarOutputStream}
import ammonite.ops._
+import mill.clientserver.{ClientInputPumper, ClientServer}
import mill.define.Task
import mill.eval.PathRef
import mill.util.{Ctx, Loose}
@@ -27,6 +28,7 @@ object Jvm {
envArgs: Map[String, String] = Map.empty,
mainArgs: Seq[String] = Seq.empty,
workingDir: Path = null): Unit = {
+
import ammonite.ops.ImplicitWd._
val commandArgs =
Vector("java") ++
@@ -34,7 +36,32 @@ object Jvm {
Vector("-cp", classPath.mkString(":"), mainClass) ++
mainArgs
- %.copy(envArgs = envArgs)(commandArgs)(workingDir)
+ val builder = new java.lang.ProcessBuilder()
+ import collection.JavaConverters._
+ for ((k, v) <- envArgs){
+ if (v != null) builder.environment().put(k, v)
+ else builder.environment().remove(k)
+ }
+ builder.directory(workingDir.toIO)
+
+ val process =
+ builder
+ .command(commandArgs:_*)
+ .start()
+
+ val sources = Seq(
+ process.getInputStream -> System.out,
+ process.getErrorStream -> System.err,
+ System.in -> process.getOutputStream
+ )
+
+ for((std, dest) <- sources){
+ new Thread(new ClientInputPumper(std, dest)).start()
+ }
+
+ val exitCode = process.waitFor()
+ if (exitCode == 0) ()
+ else throw InteractiveShelloutException()
}
def runLocal(mainClass: String,
@@ -253,5 +280,28 @@ object Jvm {
}
PathRef(outputPath)
}
+ def launcherShellScript(mainClass: String,
+ classPath: Agg[String],
+ jvmArgs: Seq[String]) = {
+ s"""#!/usr/bin/env sh
+ |
+ |exec java ${jvmArgs.mkString(" ")} $$JAVA_OPTS -cp "${classPath.mkString(":")}" $mainClass "$$@"
+ """.stripMargin
+ }
+ def createLauncher(mainClass: String,
+ classPath: Agg[Path],
+ jvmArgs: Seq[String])
+ (implicit ctx: Ctx.Dest)= {
+ val outputPath = ctx.dest / "run"
+
+ write(outputPath, launcherShellScript(mainClass, classPath.map(_.toString), jvmArgs))
+
+ val perms = java.nio.file.Files.getPosixFilePermissions(outputPath.toNIO)
+ perms.add(PosixFilePermission.GROUP_EXECUTE)
+ perms.add(PosixFilePermission.OWNER_EXECUTE)
+ perms.add(PosixFilePermission.OTHERS_EXECUTE)
+ java.nio.file.Files.setPosixFilePermissions(outputPath.toNIO, perms)
+ PathRef(outputPath)
+ }
}