summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-04-07 09:40:07 -0700
committerLi Haoyi <haoyi.sg@gmail.com>2018-04-07 11:08:17 -0700
commit9598b243d7c5108a99fd98860810f71f6302aec1 (patch)
treeecb446f42e95845453722f37927bb9e1374fb75e /main
parentcfb494443ff84c30c8fab457fdc9dcfad7d76769 (diff)
downloadmill-9598b243d7c5108a99fd98860810f71f6302aec1.tar.gz
mill-9598b243d7c5108a99fd98860810f71f6302aec1.tar.bz2
mill-9598b243d7c5108a99fd98860810f71f6302aec1.zip
first pass at moving mill client over to JavaModule
Diffstat (limited to 'main')
-rw-r--r--main/src/mill/Main.scala3
-rw-r--r--main/src/mill/main/Server.scala183
-rw-r--r--main/src/mill/modules/Jvm.scala4
-rw-r--r--main/test/src/mill/main/ClientServerTests.scala121
4 files changed, 308 insertions, 3 deletions
diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala
index e026dfe0..4cbf30fa 100644
--- a/main/src/mill/Main.scala
+++ b/main/src/mill/Main.scala
@@ -6,6 +6,7 @@ import ammonite.main.Cli._
import ammonite.ops._
import ammonite.util.Util
import io.github.retronym.java9rtexport.Export
+import mill.client.ClientServer
import mill.eval.Evaluator
import mill.util.DummyInputStream
@@ -119,7 +120,7 @@ object Main {
stateCache
)
- if (mill.clientserver.ClientServer.isJava9OrAbove) {
+ if (ClientServer.isJava9OrAbove) {
val rt = cliConfig.home / Export.rtJarName
if (!exists(rt)) {
runner.printInfo(s"Preparing Java ${System.getProperty("java.version")} runtime; this may take a minute or two ...")
diff --git a/main/src/mill/main/Server.scala b/main/src/mill/main/Server.scala
new file mode 100644
index 00000000..ac75dfd0
--- /dev/null
+++ b/main/src/mill/main/Server.scala
@@ -0,0 +1,183 @@
+package mill.main
+
+import java.io._
+import java.net.Socket
+
+import org.scalasbt.ipcsocket._
+
+trait ServerMain[T]{
+ def main(args0: Array[String]): Unit = {
+ new Server(
+ args0(0),
+ this,
+ () => System.exit(0),
+ 300000,
+ Locks.files(args0(0))
+ ).run()
+ }
+ var stateCache = Option.empty[T]
+ def main0(args: Array[String],
+ stateCache: Option[T],
+ mainInteractive: Boolean,
+ stdin: InputStream,
+ stdout: PrintStream,
+ stderr: PrintStream): (Boolean, Option[T])
+}
+
+
+class Server[T](lockBase: String,
+ sm: ServerMain[T],
+ interruptServer: () => Unit,
+ acceptTimeout: Int,
+ locks: Locks) {
+
+ val originalStdout = System.out
+ def run() = {
+ Server.tryLockBlock(locks.processLock){
+ var running = true
+ while (running) {
+ Server.lockBlock(locks.serverLock){
+ val (serverSocket, socketClose) = if (ClientServer.isWindows) {
+ val socketName = ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName
+ (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close())
+ } else {
+ val socketName = lockBase + "/io"
+ new File(socketName).delete()
+ (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close())
+ }
+
+ val sockOpt = Server.interruptWith(
+ acceptTimeout,
+ socketClose(),
+ serverSocket.accept()
+ )
+
+ sockOpt match{
+ case None => running = false
+ case Some(sock) =>
+ try {
+ handleRun(sock)
+ serverSocket.close()
+ }
+ catch{case e: Throwable => e.printStackTrace(originalStdout) }
+ }
+ }
+ // Make sure you give an opportunity for the client to probe the lock
+ // and realize the server has released it to signal completion
+ Thread.sleep(10)
+ }
+ }.getOrElse(throw new Exception("PID already present"))
+ }
+
+ def handleRun(clientSocket: Socket) = {
+
+ val currentOutErr = clientSocket.getOutputStream
+ val socketIn = clientSocket.getInputStream
+ val argStream = new FileInputStream(lockBase + "/run")
+ val interactive = argStream.read() != 0;
+ val args = ClientServer.parseArgs(argStream)
+ argStream.close()
+
+ var done = false
+ val t = new Thread(() =>
+
+ try {
+ val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true)
+ val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true)
+ val (result, newStateCache) = sm.main0(
+ args,
+ sm.stateCache,
+ interactive,
+ socketIn,
+ stdout, stderr
+ )
+
+ sm.stateCache = newStateCache
+ java.nio.file.Files.write(
+ java.nio.file.Paths.get(lockBase + "/exitCode"),
+ (if (result) 0 else 1).toString.getBytes
+ )
+ } catch{case WatchInterrupted(sc: Option[T]) =>
+ sm.stateCache = sc
+ } finally{
+ done = true
+ }
+ )
+
+ t.start()
+ // We cannot simply use Lock#await here, because the filesystem doesn't
+ // realize the clientLock/serverLock are held by different threads in the
+ // two processes and gives a spurious deadlock error
+ while(!done && !locks.clientLock.probe()) {
+ Thread.sleep(3)
+ }
+
+ if (!done) interruptServer()
+
+ t.interrupt()
+ t.stop()
+
+ if (ClientServer.isWindows) {
+ // Closing Win32NamedPipeSocket can often take ~5s
+ // It seems OK to exit the client early and subsequently
+ // start up mill client again (perhaps closing the server
+ // socket helps speed up the process).
+ val t = new Thread(() => clientSocket.close())
+ t.setDaemon(true)
+ t.start()
+ } else clientSocket.close()
+ }
+}
+object Server{
+ def lockBlock[T](lock: Lock)(t: => T): T = {
+ val l = lock.lock()
+ try t
+ finally l.release()
+ }
+ def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
+ lock.tryLock() match{
+ case null => None
+ case l =>
+ try Some(t)
+ finally l.release()
+ }
+
+ }
+ def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
+ @volatile var interrupt = true
+ @volatile var interrupted = false
+ new Thread(() => {
+ Thread.sleep(millis)
+ if (interrupt) {
+ interrupted = true
+ close
+ }
+ }).start()
+
+ try {
+ val res =
+ try Some(t)
+ catch {case e: Throwable => None}
+
+ if (interrupted) None
+ else res
+
+ } finally {
+ interrupt = false
+ }
+ }
+}
+
+class ProxyOutputStream(x: => java.io.OutputStream,
+ key: Int) extends java.io.OutputStream {
+ override def write(b: Int) = x.synchronized{
+ x.write(key)
+ x.write(b)
+ }
+}
+class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
+ def read() = x.read()
+ override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
+ override def read(b: Array[Byte]) = x.read(b)
+}
+case class WatchInterrupted[T](stateCache: Option[T]) extends Exception
diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala
index 287dc624..e7fd6a79 100644
--- a/main/src/mill/modules/Jvm.scala
+++ b/main/src/mill/modules/Jvm.scala
@@ -1,6 +1,6 @@
package mill.modules
-import java.io.{ByteArrayInputStream, FileOutputStream, File}
+import java.io.{ByteArrayInputStream, File, FileOutputStream}
import java.lang.reflect.Modifier
import java.net.{URI, URLClassLoader}
import java.nio.file.{FileSystems, Files, OpenOption, StandardOpenOption}
@@ -9,7 +9,7 @@ import java.util.jar.{JarEntry, JarFile, JarOutputStream}
import ammonite.ops._
import geny.Generator
-import mill.clientserver.InputPumper
+import mill.client.InputPumper
import mill.eval.PathRef
import mill.util.{Ctx, IO}
import mill.util.Loose.Agg
diff --git a/main/test/src/mill/main/ClientServerTests.scala b/main/test/src/mill/main/ClientServerTests.scala
new file mode 100644
index 00000000..3ca7c737
--- /dev/null
+++ b/main/test/src/mill/main/ClientServerTests.scala
@@ -0,0 +1,121 @@
+package mill.main
+
+import java.io._
+import java.nio.file.Path
+
+import utest._
+class EchoServer extends ServerMain[Int]{
+ def main0(args: Array[String],
+ stateCache: Option[Int],
+ mainInteractive: Boolean,
+ stdin: InputStream,
+ stdout: PrintStream,
+ stderr: PrintStream) = {
+
+ val reader = new BufferedReader(new InputStreamReader(stdin))
+ val str = reader.readLine()
+ stdout.println(str + args(0))
+ stdout.flush()
+ stderr.println(str.toUpperCase + args(0))
+ stderr.flush()
+ (true, None)
+ }
+}
+
+object ClientServerTests extends TestSuite{
+ def initStreams() = {
+ val in = new ByteArrayInputStream("hello\n".getBytes())
+ val out = new ByteArrayOutputStream()
+ val err = new ByteArrayOutputStream()
+ (in, out, err)
+ }
+ def init() = {
+ val tmpDir = java.nio.file.Files.createTempDirectory("")
+ val locks = Locks.memory()
+
+ (tmpDir, locks)
+ }
+
+ def tests = Tests{
+ 'hello - {
+ val (tmpDir, locks) = init()
+
+ def spawnEchoServer(): Unit = {
+ new Thread(() => new Server(
+ tmpDir.toString,
+ new EchoServer(),
+ () => (),
+ 1000,
+ locks
+ ).run()).start()
+ }
+
+
+ def runClient(arg: String) = {
+ val (in, out, err) = initStreams()
+ Server.lockBlock(locks.clientLock){
+ Client.run(
+ tmpDir.toString,
+ () => spawnEchoServer(),
+ locks,
+ in,
+ out,
+ err,
+ Array(arg)
+ )
+ Thread.sleep(100)
+ (new String(out.toByteArray), new String(err.toByteArray))
+ }
+ }
+
+ // Make sure the simple "have the client start a server and
+ // exchange one message" workflow works from end to end.
+
+ assert(
+ locks.clientLock.probe(),
+ locks.serverLock.probe(),
+ locks.processLock.probe()
+ )
+
+ val (out1, err1) = runClient("world")
+
+ assert(
+ out1 == "helloworld\n",
+ err1 == "HELLOworld\n"
+ )
+
+ // Give a bit of time for the server to release the lock and
+ // re-acquire it to signal to the client that it's done
+ Thread.sleep(100)
+
+ assert(
+ locks.clientLock.probe(),
+ !locks.serverLock.probe(),
+ !locks.processLock.probe()
+ )
+
+ // A seecond client in sequence connect to the same server
+ val (out2, err2) = runClient(" WORLD")
+
+ assert(
+ out2 == "hello WORLD\n",
+ err2 == "HELLO WORLD\n"
+ )
+
+ // Make sure the server times out of not used for a while
+ Thread.sleep(2000)
+ assert(
+ locks.clientLock.probe(),
+ locks.serverLock.probe(),
+ locks.processLock.probe()
+ )
+
+ // Have a third client spawn/connect-to a new server at the same path
+ val (out3, err3) = runClient(" World")
+ assert(
+ out3 == "hello World\n",
+ err3 == "HELLO World\n"
+ )
+ }
+ }
+}