diff options
author | Li Haoyi <haoyi.sg@gmail.com> | 2018-04-07 09:40:07 -0700 |
---|---|---|
committer | Li Haoyi <haoyi.sg@gmail.com> | 2018-04-07 11:08:17 -0700 |
commit | 9598b243d7c5108a99fd98860810f71f6302aec1 (patch) | |
tree | ecb446f42e95845453722f37927bb9e1374fb75e /main/src | |
parent | cfb494443ff84c30c8fab457fdc9dcfad7d76769 (diff) | |
download | mill-9598b243d7c5108a99fd98860810f71f6302aec1.tar.gz mill-9598b243d7c5108a99fd98860810f71f6302aec1.tar.bz2 mill-9598b243d7c5108a99fd98860810f71f6302aec1.zip |
first pass at moving mill client over to JavaModule
Diffstat (limited to 'main/src')
-rw-r--r-- | main/src/mill/Main.scala | 3 | ||||
-rw-r--r-- | main/src/mill/main/Server.scala | 183 | ||||
-rw-r--r-- | main/src/mill/modules/Jvm.scala | 4 |
3 files changed, 187 insertions, 3 deletions
diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala index e026dfe0..4cbf30fa 100644 --- a/main/src/mill/Main.scala +++ b/main/src/mill/Main.scala @@ -6,6 +6,7 @@ import ammonite.main.Cli._ import ammonite.ops._ import ammonite.util.Util import io.github.retronym.java9rtexport.Export +import mill.client.ClientServer import mill.eval.Evaluator import mill.util.DummyInputStream @@ -119,7 +120,7 @@ object Main { stateCache ) - if (mill.clientserver.ClientServer.isJava9OrAbove) { + if (ClientServer.isJava9OrAbove) { val rt = cliConfig.home / Export.rtJarName if (!exists(rt)) { runner.printInfo(s"Preparing Java ${System.getProperty("java.version")} runtime; this may take a minute or two ...") diff --git a/main/src/mill/main/Server.scala b/main/src/mill/main/Server.scala new file mode 100644 index 00000000..ac75dfd0 --- /dev/null +++ b/main/src/mill/main/Server.scala @@ -0,0 +1,183 @@ +package mill.main + +import java.io._ +import java.net.Socket + +import org.scalasbt.ipcsocket._ + +trait ServerMain[T]{ + def main(args0: Array[String]): Unit = { + new Server( + args0(0), + this, + () => System.exit(0), + 300000, + Locks.files(args0(0)) + ).run() + } + var stateCache = Option.empty[T] + def main0(args: Array[String], + stateCache: Option[T], + mainInteractive: Boolean, + stdin: InputStream, + stdout: PrintStream, + stderr: PrintStream): (Boolean, Option[T]) +} + + +class Server[T](lockBase: String, + sm: ServerMain[T], + interruptServer: () => Unit, + acceptTimeout: Int, + locks: Locks) { + + val originalStdout = System.out + def run() = { + Server.tryLockBlock(locks.processLock){ + var running = true + while (running) { + Server.lockBlock(locks.serverLock){ + val (serverSocket, socketClose) = if (ClientServer.isWindows) { + val socketName = ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName + (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close()) + } else { + val socketName = lockBase + "/io" + new File(socketName).delete() + (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close()) + } + + val sockOpt = Server.interruptWith( + acceptTimeout, + socketClose(), + serverSocket.accept() + ) + + sockOpt match{ + case None => running = false + case Some(sock) => + try { + handleRun(sock) + serverSocket.close() + } + catch{case e: Throwable => e.printStackTrace(originalStdout) } + } + } + // Make sure you give an opportunity for the client to probe the lock + // and realize the server has released it to signal completion + Thread.sleep(10) + } + }.getOrElse(throw new Exception("PID already present")) + } + + def handleRun(clientSocket: Socket) = { + + val currentOutErr = clientSocket.getOutputStream + val socketIn = clientSocket.getInputStream + val argStream = new FileInputStream(lockBase + "/run") + val interactive = argStream.read() != 0; + val args = ClientServer.parseArgs(argStream) + argStream.close() + + var done = false + val t = new Thread(() => + + try { + val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) + val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) + val (result, newStateCache) = sm.main0( + args, + sm.stateCache, + interactive, + socketIn, + stdout, stderr + ) + + sm.stateCache = newStateCache + java.nio.file.Files.write( + java.nio.file.Paths.get(lockBase + "/exitCode"), + (if (result) 0 else 1).toString.getBytes + ) + } catch{case WatchInterrupted(sc: Option[T]) => + sm.stateCache = sc + } finally{ + done = true + } + ) + + t.start() + // We cannot simply use Lock#await here, because the filesystem doesn't + // realize the clientLock/serverLock are held by different threads in the + // two processes and gives a spurious deadlock error + while(!done && !locks.clientLock.probe()) { + Thread.sleep(3) + } + + if (!done) interruptServer() + + t.interrupt() + t.stop() + + if (ClientServer.isWindows) { + // Closing Win32NamedPipeSocket can often take ~5s + // It seems OK to exit the client early and subsequently + // start up mill client again (perhaps closing the server + // socket helps speed up the process). + val t = new Thread(() => clientSocket.close()) + t.setDaemon(true) + t.start() + } else clientSocket.close() + } +} +object Server{ + def lockBlock[T](lock: Lock)(t: => T): T = { + val l = lock.lock() + try t + finally l.release() + } + def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = { + lock.tryLock() match{ + case null => None + case l => + try Some(t) + finally l.release() + } + + } + def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = { + @volatile var interrupt = true + @volatile var interrupted = false + new Thread(() => { + Thread.sleep(millis) + if (interrupt) { + interrupted = true + close + } + }).start() + + try { + val res = + try Some(t) + catch {case e: Throwable => None} + + if (interrupted) None + else res + + } finally { + interrupt = false + } + } +} + +class ProxyOutputStream(x: => java.io.OutputStream, + key: Int) extends java.io.OutputStream { + override def write(b: Int) = x.synchronized{ + x.write(key) + x.write(b) + } +} +class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ + def read() = x.read() + override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) + override def read(b: Array[Byte]) = x.read(b) +} +case class WatchInterrupted[T](stateCache: Option[T]) extends Exception diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala index 287dc624..e7fd6a79 100644 --- a/main/src/mill/modules/Jvm.scala +++ b/main/src/mill/modules/Jvm.scala @@ -1,6 +1,6 @@ package mill.modules -import java.io.{ByteArrayInputStream, FileOutputStream, File} +import java.io.{ByteArrayInputStream, File, FileOutputStream} import java.lang.reflect.Modifier import java.net.{URI, URLClassLoader} import java.nio.file.{FileSystems, Files, OpenOption, StandardOpenOption} @@ -9,7 +9,7 @@ import java.util.jar.{JarEntry, JarFile, JarOutputStream} import ammonite.ops._ import geny.Generator -import mill.clientserver.InputPumper +import mill.client.InputPumper import mill.eval.PathRef import mill.util.{Ctx, IO} import mill.util.Loose.Agg |