diff options
author | Li Haoyi <haoyi.sg@gmail.com> | 2018-02-21 21:05:37 -0800 |
---|---|---|
committer | Li Haoyi <haoyi.sg@gmail.com> | 2018-02-24 17:13:03 -0800 |
commit | c98408adf2d96928fe227a740631a8efd8e0c339 (patch) | |
tree | 5a36d9ee7d8ee6e1f7f9247cd7ddd31b194df5df | |
parent | 51db54d4f1deefb34b9d7f6581611ae166652493 (diff) | |
download | mill-c98408adf2d96928fe227a740631a8efd8e0c339.tar.gz mill-c98408adf2d96928fe227a740631a8efd8e0c339.tar.bz2 mill-c98408adf2d96928fe227a740631a8efd8e0c339.zip |
Clean up the provisional client-server code with unit tests and proper file-sockets
Seems to work well enough for interactive scala consoles, though still not Ammonite
Also Added ScalaModule#launcher and re-worked our build.sc file to use it
24 files changed, 776 insertions, 402 deletions
@@ -63,6 +63,15 @@ def ammoniteRun(hole: SettingKey[File], args: String => List[String], suffix: St target } +lazy val clientserver = project + .settings( + sharedSettings, + pluginSettings, + name := "mill-core", + libraryDependencies ++= Seq( + "org.scala-sbt.ipcsocket" % "ipcsocket" % "1.0.0" + ) + ) lazy val core = project .dependsOn(moduledefs) @@ -83,7 +92,7 @@ lazy val core = project ) lazy val main = project - .dependsOn(core) + .dependsOn(core, clientserver) .settings( sharedSettings, pluginSettings, @@ -1,11 +1,13 @@ import $file.shared import $file.upload import java.io.File +import java.nio.file.attribute.PosixFilePermission import ammonite.ops._ import coursier.maven.MavenRepository import mill._ -import mill.scalalib._, publish._ +import mill.scalalib._ +import publish._ import mill.modules.Jvm.createAssembly import upickle.Js trait MillPublishModule extends PublishModule{ @@ -60,6 +62,13 @@ trait MillModule extends MillPublishModule{ outer => } } +object clientserver extends MillModule{ + def ivyDeps = Agg( + ivy"org.scala-sbt.ipcsocket:ipcsocket:1.0.0" + ) + val test = new Tests(implicitly) +} + object core extends MillModule { def moduleDeps = Seq(moduledefs) @@ -78,7 +87,8 @@ object core extends MillModule { } object main extends MillModule { - def moduleDeps = Seq(core) + def moduleDeps = Seq(core, clientserver) + def compileIvyDeps = Agg( ivy"org.scala-lang:scala-reflect:${scalaVersion()}" @@ -191,42 +201,36 @@ object integration extends MillModule{ def forkArgs = testArgs() } -val assemblyProjects = Seq(scalalib, scalajslib) - -def assemblyClasspath = mill.define.Task.traverse(assemblyProjects)(_.runClasspath) -def assemblyBase(classpath: Agg[Path], extraArgs: String) - (implicit ctx: mill.util.Ctx.Dest) = { - createAssembly( - classpath, - prependShellScript = - "#!/usr/bin/env sh\n" + - s"""exec java $extraArgs $$JAVA_OPTS -cp "$$0" mill.Main "$$@" """ -// s"""exec java $extraArgs $$JAVA_OPTS -cp "$$0" mill.Client "$$@" """ - ) +object dev extends MillModule{ + def moduleDeps = Seq(scalalib, scalajslib) + def forkArgs = T{ + scalalib.testArgs() ++ scalajslib.testArgs() ++ scalaworker.testArgs() + } + def mainClass = Some("mill.ClientMain") + + def run(wd: Path, args: String*) = T.command{ + mill.modules.Jvm.interactiveSubprocess( + finalMainClass(), + runClasspath().map(_.path), + forkArgs(), + forkEnv(), + args, + workingDir = ammonite.ops.pwd + ) + } } -def devAssembly = T{ - assemblyBase( - Agg.from(assemblyClasspath().flatten.map(_.path)), - (scalalib.testArgs() ++ scalajslib.testArgs() ++ scalaworker.testArgs()).mkString(" ") - ) -} -def dev(wd: Path, args: String*) = T.command{ - mill.modules.Jvm.interactiveSubprocess( - "mill.Main", - Agg.from(assemblyClasspath().flatten.map(_.path)), - jvmArgs = scalalib.testArgs() ++ scalajslib.testArgs() ++ scalaworker.testArgs(), - mainArgs = args, - workingDir = wd - ) -} +def release = T{ + createAssembly( + dev.runClasspath().map(_.path), + prependShellScript = mill.modules.Jvm.launcherShellScript( + dev.mainClass().get, + Agg("$0"), + Seq("-DMILL_VERSION=" + publishVersion()._2) + ) -def releaseAssembly = T{ - assemblyBase( - Agg.from(assemblyClasspath().flatten.map(_.path)), - "-DMILL_VERSION=" + publishVersion()._2 ) } @@ -262,15 +266,15 @@ def publishVersion = T.input{ } def uploadToGithub(authKey: String) = T.command{ - val (release, label) = publishVersion() + val (releaseTag, label) = publishVersion() - if (release != "unstable"){ + if (releaseTag != "unstable"){ scalaj.http.Http("https://api.github.com/repos/lihaoyi/mill/releases") .postData( upickle.json.write( Js.Obj( - "tag_name" -> Js.Str(release), - "name" -> Js.Str(release) + "tag_name" -> Js.Str(releaseTag), + "name" -> Js.Str(releaseTag) ) ) ) @@ -278,5 +282,5 @@ def uploadToGithub(authKey: String) = T.command{ .asString } - upload.apply(releaseAssembly().path, release, label, authKey) + upload.apply(release().path, releaseTag, label, authKey) } diff --git a/ci/publish-local.sh b/ci/publish-local.sh index a26a590d..0d7559e3 100755 --- a/ci/publish-local.sh +++ b/ci/publish-local.sh @@ -6,6 +6,6 @@ set -eux sbt bin/test:assembly # Build Mill using SBT -target/bin/mill all __.publishLocal releaseAssembly +target/bin/mill all __.publishLocal release -mv out/releaseAssembly/dest/out.jar ~/mill-release +mv out/release/dest/out.jar ~/mill-release diff --git a/ci/test-mill-built.sh b/ci/test-mill-built.sh index c63e9804..44af3876 100755 --- a/ci/test-mill-built.sh +++ b/ci/test-mill-built.sh @@ -8,10 +8,10 @@ git clean -xdf ci/publish-local.sh # Build Mill using SBT -target/bin/mill devAssembly +target/bin/mill dev.assembly # Second build & run tests using Mill -out/devAssembly/dest/out.jar all {main,scalalib,scalajslib}.test devAssembly -out/devAssembly/dest/out.jar integration.test "mill.integration.forked.{AmmoniteTests,BetterFilesTests}" -out/devAssembly/dest/out.jar devAssembly +out/dev/assembly/dest/out.jar all {clientserver,main,scalalib,scalajslib}.test +out/dev/assembly/dest/out.jar integration.test "mill.integration.forked.{AmmoniteTests,BetterFilesTests}" +out/dev/assembly/dest/out.jar dev.assembly diff --git a/ci/test-mill-release.sh b/ci/test-mill-release.sh index 0b874d48..838d1960 100755 --- a/ci/test-mill-release.sh +++ b/ci/test-mill-release.sh @@ -11,6 +11,6 @@ git clean -xdf # Second build & run tests using Mill -~/mill-release all {main,scalalib,scalajslib}.test +~/mill-release all {clientserver,main,scalalib,scalajslib}.test ~/mill-release integration.test "mill.integration.forked.{AcyclicTests,JawnTests,UpickleTests}" -~/mill-release devAssembly +~/mill-release dev.assembly diff --git a/ci/test-sbt-built.sh b/ci/test-sbt-built.sh index e93eb4fc..3a85d345 100755 --- a/ci/test-sbt-built.sh +++ b/ci/test-sbt-built.sh @@ -8,6 +8,6 @@ git clean -xdf sbt bin/test:assembly # Run tests using Mill built using SBT -target/bin/mill all {main,scalalib,scalajslib}.test +target/bin/mill all {clientserver,main,scalalib,scalajslib}.test target/bin/mill integration.test "mill.integration.local.{AcyclicTests,JawnTests,UpickleTests}" -target/bin/mill devAssembly +target/bin/mill dev.assembly diff --git a/ci/test-sbt.sh b/ci/test-sbt.sh index 8b728916..a57b06e2 100755 --- a/ci/test-sbt.sh +++ b/ci/test-sbt.sh @@ -6,6 +6,6 @@ set -eux git clean -xdf # First build & run tests using SBT -sbt core/test main/test scalalib/test scalajslib/test +sbt core/test clientserver/test main/test scalalib/test scalajslib/test sbt "integration/test-only -- mill.integration.local.{AmmoniteTests,BetterFilesTests}" sbt bin/test:assembly diff --git a/clientserver/src/mill/clientserver/Client.scala b/clientserver/src/mill/clientserver/Client.scala new file mode 100644 index 00000000..dcf65271 --- /dev/null +++ b/clientserver/src/mill/clientserver/Client.scala @@ -0,0 +1,57 @@ +package mill.clientserver + +import java.io._ + +import org.scalasbt.ipcsocket.UnixDomainSocket + +object Client{ + def WithLock[T](index: Int)(f: String => T): T = { + val lockBase = "out/mill-worker-" + index + new java.io.File(lockBase).mkdirs() + val lockFile = new RandomAccessFile(lockBase+ "/clientLock", "rw") + val channel = lockFile.getChannel + channel.tryLock() match{ + case null => + lockFile.close() + channel.close() + if (index < 5) WithLock(index + 1)(f) + else throw new Exception("Reached max process limit: " + 5) + case locked => + try f(lockBase) + finally{ + locked.release() + lockFile.close() + channel.close() + } + } + } +} + +class Client(lockBase: String, + initServer: () => Unit, + locks: Locks, + stdin: InputStream, + stdout: OutputStream, + stderr: OutputStream) extends ClientServer(lockBase){ + def run(args: Array[String]) = { + + val f = new FileOutputStream(runFile) + ClientServer.writeArgs(System.console() != null, args, f) + f.close() + if (locks.processLock.probe()) initServer() + while(locks.processLock.probe()) Thread.sleep(3) + + val ioSocket = ClientServer.retry(1000, new UnixDomainSocket(ioPath)) + val outErr = ioSocket.getInputStream + val in = ioSocket.getOutputStream + val outPump = new ClientOutputPumper(outErr, stdout, stderr) + val inPump = new ClientInputPumper(stdin, in) + val outThread = new Thread(outPump) + outThread.setDaemon(true) + val inThread = new Thread(inPump) + inThread.setDaemon(true) + outThread.start() + inThread.start() + locks.serverLock.await() + } +} diff --git a/clientserver/src/mill/clientserver/ClientServer.scala b/clientserver/src/mill/clientserver/ClientServer.scala new file mode 100644 index 00000000..2cc38859 --- /dev/null +++ b/clientserver/src/mill/clientserver/ClientServer.scala @@ -0,0 +1,139 @@ +package mill.clientserver + +import java.io.{FileInputStream, InputStream, OutputStream, RandomAccessFile} +import java.nio.channels.FileChannel + +import scala.annotation.tailrec + +class ClientServer(lockBase: String){ + val ioPath = lockBase + "/io" + val logFile = new java.io.File(lockBase + "/log") + val runFile = new java.io.File(lockBase + "/run") +} + +object ClientServer{ + def parseArgs(argStream: InputStream) = { + val interactive = argStream.read() != 0 + val argsLength = argStream.read() + val args = Array.fill(argsLength){ + val n = argStream.read() + val arr = new Array[Byte](n) + argStream.read(arr) + new String(arr) + } + (interactive, args) + } + def writeArgs(interactive: Boolean, args: Array[String], argStream: OutputStream) = { + argStream.write(if (interactive) 1 else 0) + argStream.write(args.length) + var i = 0 + while (i < args.length){ + argStream.write(args(i).length) + argStream.write(args(i).getBytes) + i += 1 + } + } + @tailrec def retry[T](millis: Long, t: => T): T = { + val current = System.currentTimeMillis() + val res = + try Some(t) + catch{case e: Throwable if System.currentTimeMillis() < current + millis => + None + } + res match{ + case Some(t) => t + case None => + Thread.sleep(1) + retry(millis - (System.currentTimeMillis() - current), t) + } + } + + def interruptWith[T](millis: Int, close: => Unit)(t: => T): T = { + var int = true + new Thread(() => { + Thread.sleep(millis) + if (int) close + }).start() + + try t + finally { + + int = false + } + } + + def polling[T](probe: => Boolean, cb: () => Unit)(t: => T): T = { + var probing = true + val probeThread = new Thread(() => while(probing){ + if (probe){ + probing = false + cb() + } + Thread.sleep(1000) + }) + probeThread.start() + try t + finally probing = false + } +} +object ProxyOutputStream{ + val lock = new Object +} +class ProxyOutputStream(x: => java.io.OutputStream, + key: Int) extends java.io.OutputStream { + override def write(b: Int) = ProxyOutputStream.lock.synchronized{ + x.write(key) + x.write(b) + } +} +class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ + def read() = x.read() + override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) + override def read(b: Array[Byte]) = x.read(b) +} + +class ClientInputPumper(src: InputStream, dest: OutputStream) extends Runnable{ + var running = true + def run() = { + val buffer = new Array[Byte](1024) + while(running){ + val n = src.read(buffer) + if (n == -1) running = false + else { + dest.write(buffer, 0, n) + dest.flush() + } + } + } + +} +class ClientOutputPumper(src: InputStream, dest1: OutputStream, dest2: OutputStream) extends Runnable{ + var running = true + def run() = { + val buffer = new Array[Byte](1024) + var state = 0 + while(running){ + val n = src.read(buffer) + if (n == -1) running = false + else { + var i = 0 + while (i < n){ + state match{ + case 0 => state = buffer(i) + 1 + case 1 => + dest1.write(buffer(i)) + state = 0 + case 2 => + dest2.write(buffer(i)) + state = 0 + } + + i += 1 + } + dest1.flush() + dest2.flush() + } + } + } + +}
\ No newline at end of file diff --git a/clientserver/src/mill/clientserver/Locks.scala b/clientserver/src/mill/clientserver/Locks.scala new file mode 100644 index 00000000..d1644719 --- /dev/null +++ b/clientserver/src/mill/clientserver/Locks.scala @@ -0,0 +1,103 @@ +package mill.clientserver + +import java.io.RandomAccessFile +import java.nio.channels.FileChannel +import java.util.concurrent.locks.{ReadWriteLock, ReentrantLock} + + +trait Lock{ + def lock(): Locked + def lockBlock[T](t: => T): T = { + val l = lock() + try t + finally l.release() + } + def tryLockBlock[T](t: => T): Option[T] = { + tryLock() match{ + case None => + None + case Some(l) => + try Some(t) + finally l.release() + } + + } + def tryLock(): Option[Locked] + def await(): Unit = { + val l = lock() + l.release() + } + + /** + * Returns `true` if the lock is *available for taking* + */ + def probe(): Boolean +} +trait Locked{ + def release(): Unit +} +trait Locks{ + val processLock: Lock + val serverLock: Lock + val clientLock: Lock +} +class FileLocked(lock: java.nio.channels.FileLock) extends Locked{ + def release() = { + lock.release() + } +} + +class FileLock(path: String) extends Lock{ + + val raf = new RandomAccessFile(path, "rw") + val chan = raf.getChannel + def lock() = { + val lock = chan.lock() + new FileLocked(lock) + } + def tryLock() = { + chan.tryLock() match{ + case null => None + case lock => Some(new FileLocked(lock)) + } + } + def probe(): Boolean = tryLock() match{ + case None => false + case Some(locked) => + locked.release() + true + } +} +class FileLocks(lockBase: String) extends Locks{ + val processLock = new FileLock(lockBase + "/pid") + + val serverLock = new FileLock(lockBase + "/serverLock") + + val clientLock = new FileLock(lockBase + "/clientLock") +} +class MemoryLocked(l: java.util.concurrent.locks.Lock) extends Locked{ + def release() = l.unlock() +} + +class MemoryLock() extends Lock{ + val innerLock = new ReentrantLock(true) + + def probe() = !innerLock.isLocked + def lock() = { + innerLock.lock() + new MemoryLocked(innerLock) + } + def tryLock() = { + innerLock.tryLock() match{ + case false => None + case true => Some(new MemoryLocked(innerLock)) + } + } +} +class MemoryLocks() extends Locks{ + val processLock = new MemoryLock() + + val serverLock = new MemoryLock() + + val clientLock = new MemoryLock() +}
\ No newline at end of file diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala new file mode 100644 index 00000000..ad2e35e4 --- /dev/null +++ b/clientserver/src/mill/clientserver/Server.scala @@ -0,0 +1,104 @@ +package mill.clientserver + +import java.io._ +import java.net.Socket + +import org.scalasbt.ipcsocket.UnixDomainServerSocket + +trait ServerMain[T]{ + def main(args0: Array[String]): Unit = { + new Server( + args0(0), + this, + () => System.exit(0), + () => System.currentTimeMillis(), + new FileLocks(args0(0)) + ).run() + } + var stateCache = Option.empty[T] + def main0(args: Array[String], + stateCache: Option[T], + mainInteractive: Boolean, + watchInterrupted: () => Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream): (Boolean, Option[T]) +} + + +class Server[T](lockBase: String, + sm: ServerMain[T], + interruptServer: () => Unit, + currentTimeMillis: () => Long, + locks: Locks) extends ClientServer(lockBase){ + + val originalStdout = System.out + def run() = { + locks.processLock.tryLockBlock{ + var lastRun = currentTimeMillis() + while (currentTimeMillis() - lastRun < 60000) locks.serverLock.lockBlock{ + new File(ioPath).delete() + val ioSocket = new UnixDomainServerSocket(ioPath) + val sockOpt = ClientServer.interruptWith( + 1000, + ioSocket.close() + ){ + try Some(ioSocket.accept()) + catch{ case e: IOException => None} + } + + sockOpt.foreach{sock => + try handleRun(sock) + catch{case e: Throwable => e.printStackTrace(originalStdout) } + finally lastRun = currentTimeMillis() + } + } + }.getOrElse(throw new Exception("PID already present")) + } + + def handleRun(clientSocket: Socket) = { + + val currentOutErr = clientSocket.getOutputStream + val socketIn = clientSocket.getInputStream + val argStream = new FileInputStream(runFile) + val (interactive, args) = ClientServer.parseArgs(argStream) + argStream.close() + + var done = false + val t = new Thread(() => + + try { + val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) + val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) + val (_, newStateCache) = sm.main0( + args, + sm.stateCache, + interactive, + () => !locks.clientLock.probe(), + socketIn, + stdout, stderr + ) + + sm.stateCache = newStateCache + } catch{case WatchInterrupted(sc: Option[T]) => + sm.stateCache = sc + } finally{ + done = true + } + ) + + t.start() + + // We cannot simply use Lock#await here, because the filesystem doesn't + // realize the clientLock/serverLock are held by different threads in the + // two processes and gives a spurious deadlock error + while(!done && !locks.clientLock.probe()) { + Thread.sleep(3) + } + + t.interrupt() + t.stop() + clientSocket.close() + } +} +case class WatchInterrupted[T](stateCache: Option[T]) extends Exception
\ No newline at end of file diff --git a/clientserver/test/src/mill/clientserver/ClientServerTests.scala b/clientserver/test/src/mill/clientserver/ClientServerTests.scala new file mode 100644 index 00000000..ecf09ab3 --- /dev/null +++ b/clientserver/test/src/mill/clientserver/ClientServerTests.scala @@ -0,0 +1,118 @@ +package mill.clientserver +import java.io._ +import java.nio.file.Path + +import utest._ +class EchoServer extends ServerMain[Int]{ + def main0(args: Array[String], + stateCache: Option[Int], + mainInteractive: Boolean, + watchInterrupted: () => Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream) = { + + val reader = new BufferedReader(new InputStreamReader(stdin)) + val str = reader.readLine() + stdout.println(str + args(0)) + stdout.flush() + stderr.println(str.toUpperCase + args(0)) + stderr.flush() + (true, None) + } +} + +object ClientServerTests extends TestSuite{ + def initStreams() = { + val in = new ByteArrayInputStream("hello\n".getBytes()) + val out = new ByteArrayOutputStream() + val err = new ByteArrayOutputStream() + (in, out, err) + } + def init() = { + val tmpDir = java.nio.file.Files.createTempDirectory("") + val locks = new MemoryLocks() + + (tmpDir, locks) + } + + def tests = Tests{ + 'hello - { + var currentTimeMillis = 0 + val (tmpDir, locks) = init() + + def spawnEchoServer() = { + new Thread(() => new Server( + tmpDir.toString, + new EchoServer(), + () => (), + () => currentTimeMillis, + locks + ).run()).start() + } + + + def runClient(arg: String) = { + val (in, out, err) = initStreams() + locks.clientLock.lockBlock{ + val c = new Client( + tmpDir.toString, + () => spawnEchoServer(), + locks, + in, + out, + err + ) + c.run(Array(arg)) + (new String(out.toByteArray), new String(err.toByteArray)) + } + } + + // Make sure the simple "have the client start a server and + // exchange one message" workflow works from end to end. + + assert( + locks.clientLock.probe(), + locks.serverLock.probe(), + locks.processLock.probe() + ) + + val (out1, err1) = runClient("world") + + assert( + out1 == "helloworld\n", + err1 == "HELLOworld\n" + ) + + assert( + locks.clientLock.probe(), + !locks.serverLock.probe(), + !locks.processLock.probe() + ) + + // A seecond client in sequence connect to the same server + val (out2, err2) = runClient(" WORLD") + + assert( + out2 == "hello WORLD\n", + err2 == "HELLO WORLD\n" + ) + + // Make sure the server times out of not used for a while + currentTimeMillis += 60001 + Thread.sleep(2000) + assert( + locks.clientLock.probe(), + locks.serverLock.probe(), + locks.processLock.probe() + ) + + // Have a third client spawn/connect-to a new server at the same path + val (out3, err3) = runClient(" World") + assert( + out3 == "hello World\n", + err3 == "HELLO World\n" + ) + } + } +} diff --git a/core/src/mill/eval/Evaluator.scala b/core/src/mill/eval/Evaluator.scala index 85dcf877..fd9d6bbe 100644 --- a/core/src/mill/eval/Evaluator.scala +++ b/core/src/mill/eval/Evaluator.scala @@ -286,21 +286,24 @@ case class Evaluator[T](outPath: Path, ) val out = System.out + val in = System.in val err = System.err try{ + System.setIn(multiLogger.inStream) System.setErr(multiLogger.errorStream) System.setOut(multiLogger.outputStream) - Console.withOut(multiLogger.outputStream){ - Console.withErr(multiLogger.errorStream){ - task.evaluate(args) + Console.withIn(multiLogger.inStream){ + Console.withOut(multiLogger.outputStream){ + Console.withErr(multiLogger.errorStream){ + task.evaluate(args) + } } } - }catch{ case NonFatal(e) => - - Result.Exception(e, new OuterStack(currentStack)) + }catch{ case NonFatal(e) => Result.Exception(e, new OuterStack(currentStack)) }finally{ System.setErr(err) System.setOut(out) + System.setIn(in) } } diff --git a/core/src/mill/util/Logger.scala b/core/src/mill/util/Logger.scala index 29cee23c..55ea84cc 100644 --- a/core/src/mill/util/Logger.scala +++ b/core/src/mill/util/Logger.scala @@ -29,6 +29,7 @@ trait Logger { def colored: Boolean val errorStream: PrintStream val outputStream: PrintStream + val inStream: InputStream def info(s: String): Unit def error(s: String): Unit def ticker(s: String): Unit @@ -39,6 +40,7 @@ object DummyLogger extends Logger { def colored = false object errorStream extends PrintStream(_ => ()) object outputStream extends PrintStream(_ => ()) + val inStream = new ByteArrayInputStream(Array()) def info(s: String) = () def error(s: String) = () def ticker(s: String) = () @@ -80,7 +82,8 @@ case class PrintLogger(colored: Boolean, colors: ammonite.util.Colors, outStream: PrintStream, infoStream: PrintStream, - errStream: PrintStream) extends Logger { + errStream: PrintStream, + inStream: InputStream) extends Logger { var printState: PrintState = PrintState.Newline @@ -133,6 +136,7 @@ case class FileLogger(colored: Boolean, file: Path) extends Logger { def info(s: String) = outputStream.println(s) def error(s: String) = outputStream.println(s) def ticker(s: String) = outputStream.println(s) + val inStream: InputStream = new ByteArrayInputStream(Array()) override def close() = { if (outputStreamUsed) outputStream.close() @@ -150,6 +154,10 @@ case class MultiLogger(colored: Boolean, streams: Logger*) extends Logger { override def flush() = streams.foreach(_.outputStream.flush()) override def close() = streams.foreach(_.outputStream.close()) } + lazy val inStream = streams.collect{case t: PrintLogger => t}.headOption match{ + case Some(x) => x.inStream + case None => new ByteArrayInputStream(Array()) + } def info(s: String) = streams.foreach(_.info(s)) def error(s: String) = streams.foreach(_.error(s)) diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala index d2d2f66f..ee055d64 100644 --- a/main/src/mill/Main.scala +++ b/main/src/mill/Main.scala @@ -2,12 +2,82 @@ package mill import java.io.{InputStream, OutputStream, PrintStream} - import ammonite.main.Cli.{formatBlock, genericSignature, replSignature} import ammonite.ops._ import ammonite.util.Util +import mill.clientserver.{Client, FileLocks} import mill.eval.Evaluator + +object ClientMain { + def initServer(lockBase: String) = { + val selfJars = new java.lang.StringBuilder + var current = getClass.getClassLoader + while(current != null){ + getClass.getClassLoader match{ + case e: java.net.URLClassLoader => + val urls = e.getURLs + var i = 0 + while(i < urls.length){ + if (selfJars.length() != 0) selfJars.append(':') + selfJars.append(urls(i)) + i += 1 + } + case _ => + } + current = current.getParent + } + + val l = new java.util.ArrayList[String] + l.add("java") + val props = System.getProperties + val keys = props.stringPropertyNames().iterator() + while(keys.hasNext){ + val k = keys.next() + if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)) + } + l.add("-cp") + l.add(selfJars.toString) + l.add("mill.ServerMain") + l.add(lockBase) + new java.lang.ProcessBuilder() + .command(l) + .redirectOutput(new java.io.File(lockBase + "/logs")) + .redirectError(new java.io.File(lockBase + "/logs")) + .start() + } + def main(args: Array[String]): Unit = { + Client.WithLock(1) { lockBase => + val c = new Client( + lockBase, + () => initServer(lockBase), + new FileLocks(lockBase), + System.in, + System.out, + System.err + ) + c.run(args) + } + System.exit(0) + } +} +object ServerMain extends mill.clientserver.ServerMain[Evaluator.State]{ + def main0(args: Array[String], + stateCache: Option[Evaluator.State], + mainInteractive: Boolean, + watchInterrupted: () => Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream) = Main.main0( + args, + stateCache, + mainInteractive, + watchInterrupted, + stdin, + stdout, + stderr + ) +} object Main { def main(args: Array[String]): Unit = { diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala deleted file mode 100644 index 383a8865..00000000 --- a/main/src/mill/ServerClient.scala +++ /dev/null @@ -1,312 +0,0 @@ -package mill - -import java.io._ -import java.nio.channels.FileChannel -import java.util - -import ammonite.main.Cli -import mill.eval.Evaluator -import mill.main.MainRunner - -class ServerClient(lockBase: String){ - val inFile = new java.io.File(lockBase + "/stdin") - val outErrFile = new java.io.File(lockBase + "/stdouterr") - val metaFile = new java.io.File(lockBase + "/stdmeta") - val logFile = new java.io.File(lockBase + "/log") - val runFile = new java.io.File(lockBase + "/run") - val tmpRunFile = new java.io.File(lockBase + "/run-tmp") - val pidFile = new java.io.File(lockBase + "/pid") -} -object Client{ - def WithLock[T](index: Int)(f: String => T): T = { - val lockBase = "out/mill-worker-" + index - new java.io.File(lockBase).mkdirs() - val lockFile = new RandomAccessFile(lockBase+ "/lock", "rw") - val channel = lockFile.getChannel - channel.tryLock() match{ - case null => - lockFile.close() - channel.close() - if (index < 5) WithLock(index + 1)(f) - else throw new Exception("Reached max process limit: " + 5) - case locked => - try f(lockBase) - finally{ - locked.release() - lockFile.close() - channel.close() - } - } - } - - def main(args: Array[String]): Unit = { - WithLock(1) { lockBase => - new Client(lockBase).run(args) - } - } -} - - -class Client(lockBase: String) extends ServerClient(lockBase){ - def run(args: Array[String]) = { - - outErrFile.delete() - metaFile.delete() - outErrFile.createNewFile() - metaFile.createNewFile() - inFile.createNewFile() - inFile.createNewFile() - logFile.createNewFile() - - val f = new FileOutputStream(tmpRunFile) - f.write(if (System.console() != null) 1 else 0) - f.write(args.length) - var i = 0 - while (i < args.length){ - f.write(args(i).length) - f.write(args(i).getBytes) - i += 1 - } - f.flush() - - tmpRunFile.renameTo(runFile) - val in = new FileOutputStream(inFile) - val outErr = new FileInputStream(outErrFile) - val meta = new FileInputStream(metaFile) - - val pidRaf = new RandomAccessFile(pidFile, "rw") - val pidLockChannel = pidRaf.getChannel - - if (!probeLock(pidLockChannel)){ - val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath - - val l = new java.util.ArrayList[String] - l.add("java") - val props = System.getProperties - val keys = props.stringPropertyNames().iterator() - while(keys.hasNext){ - val k = keys.next() - if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)) - } - l.add("-cp") - l.add(selfJar) - l.add("mill.Server") - l.add(lockBase.toString) - - new java.lang.ProcessBuilder() - .command(l) - .redirectInput(inFile) - .redirectOutput(logFile) - .redirectError(logFile) - .start() - } - - - while(!probeLock(pidLockChannel)) Thread.sleep(3) - - try { - val buffer = new Array[Byte](1024) - val metaBuffer = new Array[Byte](1024) - while ({ - Thread.sleep(3) - while ( { - forwardForked(buffer, metaBuffer, meta, outErr) | - forward(buffer, System.in, in) - }) () - - runFile.exists() && probeLock(pidLockChannel) - }) () - }finally { - pidLockChannel.close() - -// pidFile.delete() - inFile.delete() - outErrFile.delete() - metaFile.delete() - } - } - - def probeLock(pidLockChannel: FileChannel) = { - - pidLockChannel.tryLock() match{ - case null => true - case locked => - locked.release() - false - } - - } - def forwardForked(buffer: Array[Byte], - metaBuffer: Array[Byte], - meta: InputStream, - outErr: InputStream) = { - - if (outErr.available() > 0){ - val outErrN = outErr.read(buffer) - if (outErrN > 0) { - var metaN = 0 - while (metaN < outErrN) { - val delta = meta.read(metaBuffer, 0, outErrN - metaN) - if (delta > 0) { - var i = 0 - while (i < delta) { - metaBuffer(i) match { - case 0 => System.out.write(buffer(metaN + i)) - case 1 => System.err.write(buffer(metaN + i)) - } - i += 1 - } - metaN += delta - } - } - } - - true - }else false - } - def forward(buffer: Array[Byte], src: InputStream, dest: OutputStream) = { - if (src.available() != 0){ - val n = src.read(buffer) - dest.write(buffer, 0, n) - true - }else false - } -} - -class ProxyOutputStream(x: => java.io.OutputStream, - meta: => java.io.OutputStream, - key: Int) extends java.io.OutputStream { - override def write(b: Int) = Server.synchronized{ - x.write(b) - meta.write(key) - } -} -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} - -object Server{ - def main(args0: Array[String]): Unit = { - new Server(args0(0)).run() - - } -} - -class ProbeThread(lockChannel: FileChannel, - mainThread: Thread, - log: PrintStream) extends Runnable{ - var running = true - def run() = { - while({ - Thread.sleep(3) - lockChannel.tryLock() match{ - case null => true && running - case locked => - locked.release() - lockChannel.close() - System.exit(0) - false - } - })() - } -} - -class Server(lockBase: String) extends ServerClient(lockBase){ - var lastRun = System.currentTimeMillis() - var currentIn = System.in - var currentOutErr: OutputStream = System.out - var currentMeta: OutputStream = System.err - val lockFile = new RandomAccessFile(lockBase + "/lock", "rw") - val channel = lockFile.getChannel - var stateCache = Option.empty[Evaluator.State] - - def run() = { - val originalStdout = System.out - originalStdout.println("Initializing") - val pidRaf = new RandomAccessFile(pidFile, "rw") - val lockChannel = pidRaf.getChannel - val lock = lockChannel.tryLock() - if (lock == null) throw new Exception("PID already present") - originalStdout.println("Locked pid file") - try { - while (System.currentTimeMillis() - lastRun < 60000) { - pollOrRun(originalStdout) - originalStdout.println("Delta " + (System.currentTimeMillis() - lastRun)) - originalStdout.println("Threads " + Thread.activeCount()) - } - }finally{ - originalStdout.println("Exiting Server... " + System.currentTimeMillis()) - lock.release() - lockChannel.close() - pidRaf.close() - pidFile.delete() - } - originalStdout.println("END") - } - def pollOrRun(originalStdout: PrintStream) = { - if (!runFile.exists()) Thread.sleep(30) - else try{ - originalStdout.println("Handling Run") - handleRun(originalStdout) - }catch{ - case e: Throwable => - originalStdout.println("Run Failed") - e.printStackTrace(originalStdout) - }finally{ - lastRun = System.currentTimeMillis() - originalStdout.println("Updating lastRun " + lastRun) - } - } - def handleRun(originalStdout: PrintStream) = { - currentIn = new FileInputStream(inFile) - currentOutErr = new FileOutputStream(outErrFile) - currentMeta = new FileOutputStream(metaFile) - val lockChannel = lockFile.getChannel - val probe = new ProbeThread(lockChannel, Thread.currentThread(), originalStdout) - val probeThread = new Thread(probe) - probeThread.start() - val argStream = new FileInputStream(runFile) - val interactive = argStream.read() != 0 - val argsLength = argStream.read() - val args = Array.fill(argsLength){ - val n = argStream.read() - val arr = new Array[Byte](n) - argStream.read(arr) - new String(arr) - } - originalStdout.println("Parsed Args " + args.toList) - try { - originalStdout.println("Running Main") - val (_, newStateCache) = mill.Main.main0( - args, - stateCache, - interactive, - () => { - channel.tryLock() match{ - case null => - false - case lock => - lock.release() - true - } - }, - new ProxyInputStream(currentIn), - new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true), - new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true) - ) - originalStdout.println("Finished Main") - stateCache = newStateCache - } catch{case MainRunner.WatchInterrupted(sc) => - stateCache = sc - } finally{ -// lockChannel.close() -// lockFile.close() - probe.running = false - probeThread.join() - runFile.delete() - - } - } -}
\ No newline at end of file diff --git a/main/src/mill/main/MainModule.scala b/main/src/mill/main/MainModule.scala index d686b1db..feafd35d 100644 --- a/main/src/mill/main/MainModule.scala +++ b/main/src/mill/main/MainModule.scala @@ -79,7 +79,7 @@ trait MainModule extends mill.Module{ // When using `show`, redirect all stdout of the evaluated tasks so the // printed JSON is the only thing printed to stdout. log = evaluator.log match{ - case PrintLogger(c1, c2, o, i, e) => PrintLogger(c1, c2, e, i, e) + case PrintLogger(c1, c2, o, i, e, in) => PrintLogger(c1, c2, e, i, e, in) case l => l } ), diff --git a/main/src/mill/main/MainRunner.scala b/main/src/mill/main/MainRunner.scala index 2bd7fbc6..907f8a7b 100644 --- a/main/src/mill/main/MainRunner.scala +++ b/main/src/mill/main/MainRunner.scala @@ -5,12 +5,9 @@ import ammonite.interp.{Interpreter, Preprocessor} import ammonite.ops.Path import ammonite.util._ import mill.eval.{Evaluator, PathRef} -import mill.main.MainRunner.WatchInterrupted + import mill.util.PrintLogger -object MainRunner{ - case class WatchInterrupted(stateCache: Option[Evaluator.State]) extends Exception -} /** * Customized version of [[ammonite.MainRunner]], allowing us to run Mill @@ -37,7 +34,7 @@ class MainRunner(val config: ammonite.main.Cli.Config, } while(statAll()) { - if (interruptWatch()) throw WatchInterrupted(stateCache) + if (interruptWatch()) throw mill.clientserver.WatchInterrupted(stateCache) Thread.sleep(100) } } @@ -58,7 +55,8 @@ class MainRunner(val config: ammonite.main.Cli.Config, colors, outprintStream, errPrintStream, - errPrintStream + errPrintStream, + stdIn ) ) diff --git a/main/src/mill/main/ReplApplyHandler.scala b/main/src/mill/main/ReplApplyHandler.scala index 5681d75b..a2b042ad 100644 --- a/main/src/mill/main/ReplApplyHandler.scala +++ b/main/src/mill/main/ReplApplyHandler.scala @@ -24,7 +24,8 @@ object ReplApplyHandler{ colors, System.out, System.err, - System.err + System.err, + System.in ) ) ) diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala index 297dcf1f..2d351c18 100644 --- a/main/src/mill/modules/Jvm.scala +++ b/main/src/mill/modules/Jvm.scala @@ -7,6 +7,7 @@ import java.nio.file.attribute.PosixFilePermission import java.util.jar.{JarEntry, JarFile, JarOutputStream} import ammonite.ops._ +import mill.clientserver.{ClientInputPumper, ClientServer} import mill.define.Task import mill.eval.PathRef import mill.util.{Ctx, Loose} @@ -27,6 +28,7 @@ object Jvm { envArgs: Map[String, String] = Map.empty, mainArgs: Seq[String] = Seq.empty, workingDir: Path = null): Unit = { + import ammonite.ops.ImplicitWd._ val commandArgs = Vector("java") ++ @@ -34,7 +36,32 @@ object Jvm { Vector("-cp", classPath.mkString(":"), mainClass) ++ mainArgs - %.copy(envArgs = envArgs)(commandArgs)(workingDir) + val builder = new java.lang.ProcessBuilder() + import collection.JavaConverters._ + for ((k, v) <- envArgs){ + if (v != null) builder.environment().put(k, v) + else builder.environment().remove(k) + } + builder.directory(workingDir.toIO) + + val process = + builder + .command(commandArgs:_*) + .start() + + val sources = Seq( + process.getInputStream -> System.out, + process.getErrorStream -> System.err, + System.in -> process.getOutputStream + ) + + for((std, dest) <- sources){ + new Thread(new ClientInputPumper(std, dest)).start() + } + + val exitCode = process.waitFor() + if (exitCode == 0) () + else throw InteractiveShelloutException() } def runLocal(mainClass: String, @@ -253,5 +280,28 @@ object Jvm { } PathRef(outputPath) } + def launcherShellScript(mainClass: String, + classPath: Agg[String], + jvmArgs: Seq[String]) = { + s"""#!/usr/bin/env sh + | + |exec java ${jvmArgs.mkString(" ")} $$JAVA_OPTS -cp "${classPath.mkString(":")}" $mainClass "$$@" + """.stripMargin + } + def createLauncher(mainClass: String, + classPath: Agg[Path], + jvmArgs: Seq[String]) + (implicit ctx: Ctx.Dest)= { + val outputPath = ctx.dest / "run" + + write(outputPath, launcherShellScript(mainClass, classPath.map(_.toString), jvmArgs)) + + val perms = java.nio.file.Files.getPosixFilePermissions(outputPath.toNIO) + perms.add(PosixFilePermission.GROUP_EXECUTE) + perms.add(PosixFilePermission.OWNER_EXECUTE) + perms.add(PosixFilePermission.OTHERS_EXECUTE) + java.nio.file.Files.setPosixFilePermissions(outputPath.toNIO, perms) + PathRef(outputPath) + } } @@ -31,7 +31,7 @@ Build a standalone executable jar: ```bash sbt bin/test:assembly -mill devAssembly +mill dev.assembly ``` Now you can re-build this very same project using the build.sc file, e.g. re-run @@ -41,27 +41,27 @@ e.g.: ```bash ./target/bin/mill core.compile -./out/devAssembly/dest/mill core.compile -./out/devAssembly/dest/mill main.test.compile -./out/devAssembly/dest/mill main.test -./out/devAssembly/dest/mill scalalib.assembly +./out/dev/assembly/dest/out.jar core.compile +./out/dev/assembly/dest/out.jar main.test.compile +./out/dev/assembly/dest/out.jar main.test +./out/dev/assembly/dest/out.jar scalalib.assembly ``` There is already a `watch` option that looks for changes on files, e.g.: ```bash ./target/bin/mill --watch core.compile -./out/devAssembly/dest/mill --watch core.compile +./out/dev/assembly/dest/out.jar --watch core.compile ``` You can get Mill to show the JSON-structured output for a particular `Target` or `Command` using the `show` flag: ```bash -./out/devAssembly/dest/mill show core.scalaVersion -./out/devAssembly/dest/mill show core.compile -./out/devAssembly/dest/mill show core.assemblyClasspath -./out/devAssembly/dest/mill show main.test +./out/dev/assembly/dest/out.jar show core.scalaVersion +./out/dev/assembly/dest/out.jar show core.compile +./out/dev/assembly/dest/out.jar show core.assemblyClasspath +./out/dev/assembly/dest/out.jar show main.test ``` Output will be generated into a the `./out` folder. @@ -73,15 +73,15 @@ it via: ```bash sbt "~bin/test:run main.test" sbt "~bin/test:run" -mill --watch dev . main.test -mill --watch dev . +mill --watch dev.run . main.test +mill --watch dev.run . ``` You can also test out your current Mill code with one of the hello-world example repos via: ```bash -mill dev docs/example-1 foo.run +mill dev.run docs/example-1 foo.run ``` Lastly, you can generate IntelliJ Scala project files using Mill via @@ -332,10 +332,10 @@ git clean -xdf sbt bin/test:assembly # Build Mill executable using the Mill executable generated by SBT -target/bin/mill devAssembly +target/bin/mill dev.Assembly # Build Mill executable using the Mill executable generated by Mill itself -out/devAssembly/dest/out.jar devAssembly +out/dev/assembly/dest/out.jar dev.assembly ``` Eventually, as Mill stabilizes, we will get rid of the SBT build entirely and diff --git a/scalalib/src/mill/scalalib/Lib.scala b/scalalib/src/mill/scalalib/Lib.scala index 50e19b16..208d545e 100644 --- a/scalalib/src/mill/scalalib/Lib.scala +++ b/scalalib/src/mill/scalalib/Lib.scala @@ -112,8 +112,4 @@ object Lib{ cross = false ) - val DefaultShellScript: Seq[String] = Seq( - "#!/usr/bin/env sh", - "exec java -jar \"$0\" \"$@\"" - ) } diff --git a/scalalib/src/mill/scalalib/ScalaModule.scala b/scalalib/src/mill/scalalib/ScalaModule.scala index ffaaeac8..a2888048 100644 --- a/scalalib/src/mill/scalalib/ScalaModule.scala +++ b/scalalib/src/mill/scalalib/ScalaModule.scala @@ -112,7 +112,17 @@ trait ScalaModule extends mill.Module with TaskModule { outer => } - def prependShellScript: T[String] = T{ "" } + def prependShellScript: T[String] = T{ + mainClass() match{ + case None => "" + case Some(cls) => + mill.modules.Jvm.launcherShellScript( + cls, + Agg("$0"), + forkArgs() + ) + } + } def sources = T.sources{ millSourcePath / 'src } def resources = T.sources{ millSourcePath / 'resources } @@ -189,7 +199,7 @@ trait ScalaModule extends mill.Module with TaskModule { outer => if (files.nonEmpty) subprocess( "scala.tools.nsc.ScalaDoc", - runClasspath().filter(_.path.ext != "pom").map(_.path), + scalaCompilerClasspath().map(_.path) ++ runClasspath().filter(_.path.ext != "pom").map(_.path), mainArgs = (files ++ options).toSeq ) @@ -204,6 +214,20 @@ trait ScalaModule extends mill.Module with TaskModule { outer => def forkEnv = T{ sys.env.toMap } + def launcher = T{ + mainClass() match { + case None => Result.Failure("Need to specify a main class for launcher") + case Some(cls) => + Result.Success( + Jvm.createLauncher( + cls, + runClasspath().map(_.path), + forkArgs() + ) + ) + } + } + def runLocal(args: String*) = T.command { Jvm.runLocal( finalMainClass(), @@ -219,7 +243,8 @@ trait ScalaModule extends mill.Module with TaskModule { outer => forkArgs(), forkEnv(), args, - workingDir = ammonite.ops.pwd) + workingDir = ammonite.ops.pwd + ) } diff --git a/scalaworker/src/mill/scalaworker/ScalaWorker.scala b/scalaworker/src/mill/scalaworker/ScalaWorker.scala index a98d327b..7a3bcd90 100644 --- a/scalaworker/src/mill/scalaworker/ScalaWorker.scala +++ b/scalaworker/src/mill/scalaworker/ScalaWorker.scala @@ -47,7 +47,8 @@ object ScalaWorker{ else Colors.BlackWhite, System.out, System.err, - System.err + System.err, + System.in )) val outputPath = args(4) |