From 9598b243d7c5108a99fd98860810f71f6302aec1 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sat, 7 Apr 2018 09:40:07 -0700 Subject: first pass at moving mill client over to JavaModule --- clientserver/src/mill/clientserver/Client.java | 219 --------------------- .../src/mill/clientserver/ClientServer.java | 42 ---- .../src/mill/clientserver/InputPumper.java | 37 ---- clientserver/src/mill/clientserver/Locks.java | 105 ---------- clientserver/src/mill/clientserver/Server.scala | 183 ----------------- 5 files changed, 586 deletions(-) delete mode 100644 clientserver/src/mill/clientserver/Client.java delete mode 100644 clientserver/src/mill/clientserver/ClientServer.java delete mode 100644 clientserver/src/mill/clientserver/InputPumper.java delete mode 100644 clientserver/src/mill/clientserver/Locks.java delete mode 100644 clientserver/src/mill/clientserver/Server.scala (limited to 'clientserver/src') diff --git a/clientserver/src/mill/clientserver/Client.java b/clientserver/src/mill/clientserver/Client.java deleted file mode 100644 index c4cbc265..00000000 --- a/clientserver/src/mill/clientserver/Client.java +++ /dev/null @@ -1,219 +0,0 @@ -package mill.clientserver; - -import org.scalasbt.ipcsocket.*; - -import java.io.*; -import java.net.Socket; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Properties; - -public class Client { - static void initServer(String lockBase, boolean setJnaNoSys) throws IOException,URISyntaxException{ - ArrayList selfJars = new ArrayList(); - ClassLoader current = Client.class.getClassLoader(); - while(current != null){ - if (current instanceof java.net.URLClassLoader) { - URL[] urls = ((java.net.URLClassLoader) current).getURLs(); - for (URL url: urls) { - selfJars.add(new File(url.toURI()).getCanonicalPath()); - } - } - current = current.getParent(); - } - if (ClientServer.isJava9OrAbove) { - selfJars.addAll(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); - } - ArrayList l = new java.util.ArrayList(); - l.add("java"); - Properties props = System.getProperties(); - Iterator keys = props.stringPropertyNames().iterator(); - while(keys.hasNext()){ - String k = keys.next(); - if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)); - } - if (setJnaNoSys) { - l.add("-Djna.nosys=true"); - } - l.add("-cp"); - l.add(String.join(File.pathSeparator, selfJars)); - l.add("mill.ServerMain"); - l.add(lockBase); - new java.lang.ProcessBuilder() - .command(l) - .redirectOutput(new java.io.File(lockBase + "/logs")) - .redirectError(new java.io.File(lockBase + "/logs")) - .start(); - } - public static void main(String[] args) throws Exception{ - boolean setJnaNoSys = System.getProperty("jna.nosys") == null; - if (setJnaNoSys) { - System.setProperty("jna.nosys", "true"); - } - int index = 0; - while (index < 5) { - index += 1; - String lockBase = "out/mill-worker-" + index; - new java.io.File(lockBase).mkdirs(); - RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw"); - FileChannel channel = lockFile.getChannel(); - java.nio.channels.FileLock tryLock = channel.tryLock(); - if (tryLock == null) { - lockFile.close(); - channel.close(); - } else { - int exitCode = Client.run( - lockBase, - new Runnable() { - @Override - public void run() { - try{ - initServer(lockBase, setJnaNoSys); - }catch(Exception e){ - throw new RuntimeException(e); - } - } - }, - Locks.files(lockBase), - System.in, - System.out, - System.err, - args - ); - System.exit(exitCode); - } - } - throw new Exception("Reached max process limit: " + 5); - } - - - public static int run(String lockBase, - Runnable initServer, - Locks locks, - InputStream stdin, - OutputStream stdout, - OutputStream stderr, - String[] args) throws Exception{ - - FileOutputStream f = new FileOutputStream(lockBase + "/run"); - ClientServer.writeArgs(System.console() != null, args, f); - f.close(); - - boolean serverInit = false; - if (locks.processLock.probe()) { - serverInit = true; - initServer.run(); - } - while(locks.processLock.probe()) Thread.sleep(3); - - // Need to give sometime for Win32NamedPipeSocket to work - // if the server is just initialized - if (serverInit && ClientServer.isWindows) Thread.sleep(1000); - - Socket ioSocket = null; - - long retryStart = System.currentTimeMillis(); - while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){ - try{ - ioSocket = ClientServer.isWindows? - new Win32NamedPipeSocket(ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName()) - : new UnixDomainSocket(lockBase + "/io"); - }catch(Throwable e){ - Thread.sleep(1); - } - } - if (ioSocket == null){ - throw new Exception("Failed to connect to server"); - } - InputStream outErr = ioSocket.getInputStream(); - OutputStream in = ioSocket.getOutputStream(); - ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr); - InputPumper inPump = new InputPumper(stdin, in, true); - Thread outThread = new Thread(outPump); - outThread.setDaemon(true); - Thread inThread = new Thread(inPump); - inThread.setDaemon(true); - outThread.start(); - inThread.start(); - - locks.serverLock.await(); - - try{ - return Integer.parseInt( - new BufferedReader( - new InputStreamReader( - new FileInputStream(lockBase + "/exitCode") - ) - ).readLine() - ); - } catch(Throwable e){ - return 1; - } - } -} - -class ClientOutputPumper implements Runnable{ - private InputStream src; - private OutputStream dest1; - private OutputStream dest2; - public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){ - this.src = src; - this.dest1 = dest1; - this.dest2 = dest2; - } - - public void run() { - byte[] buffer = new byte[1024]; - int state = 0; - boolean running = true; - boolean first = true; - while (running) { - try { - int n = src.read(buffer); - first = false; - if (n == -1) running = false; - else { - int i = 0; - while (i < n) { - switch (state) { - case 0: - state = buffer[i] + 1; - break; - case 1: - dest1.write(buffer[i]); - state = 0; - break; - case 2: - dest2.write(buffer[i]); - state = 0; - break; - } - - i += 1; - } - dest1.flush(); - dest2.flush(); - } - } catch (IOException e) { - // Win32NamedPipeSocket input stream somehow doesn't return -1, - // instead it throws an IOException whose message contains "ReadFile()". - // However, if it throws an IOException before ever reading some bytes, - // it could not connect to the server, so exit. - if (ClientServer.isWindows && e.getMessage().contains("ReadFile()")) { - if (first) { - System.err.println("Failed to connect to server"); - System.exit(1); - } else running = false; - } else { - e.printStackTrace(); - System.exit(1); - } - } - } - } - -} diff --git a/clientserver/src/mill/clientserver/ClientServer.java b/clientserver/src/mill/clientserver/ClientServer.java deleted file mode 100644 index e2e63dcf..00000000 --- a/clientserver/src/mill/clientserver/ClientServer.java +++ /dev/null @@ -1,42 +0,0 @@ -package mill.clientserver; - - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public class ClientServer { - public static boolean isWindows = System.getProperty("os.name").toLowerCase().startsWith("windows"); - public static boolean isJava9OrAbove = !System.getProperty("java.specification.version").startsWith("1."); - - // Windows named pipe prefix (see https://github.com/sbt/ipcsocket/blob/v1.0.0/README.md) - // Win32NamedPipeServerSocket automatically adds this as a prefix (if it is not already is prefixed), - // but Win32NamedPipeSocket does not - // https://github.com/sbt/ipcsocket/blob/v1.0.0/src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeServerSocket.java#L36 - public static String WIN32_PIPE_PREFIX = "\\\\.\\pipe\\"; - - public static String[] parseArgs(InputStream argStream) throws IOException { - - int argsLength = argStream.read(); - String[] args = new String[argsLength]; - for (int i = 0; i < args.length; i++) { - int n = argStream.read(); - byte[] arr = new byte[n]; - argStream.read(arr); - args[i] = new String(arr); - } - return args; - } - public static void writeArgs(Boolean interactive, - String[] args, - OutputStream argStream) throws IOException{ - argStream.write(interactive ? 1 : 0); - argStream.write(args.length); - int i = 0; - while (i < args.length){ - argStream.write(args[i].length()); - argStream.write(args[i].getBytes()); - i += 1; - } - } -} \ No newline at end of file diff --git a/clientserver/src/mill/clientserver/InputPumper.java b/clientserver/src/mill/clientserver/InputPumper.java deleted file mode 100644 index 9f4bfb82..00000000 --- a/clientserver/src/mill/clientserver/InputPumper.java +++ /dev/null @@ -1,37 +0,0 @@ -package mill.clientserver; - -import java.io.InputStream; -import java.io.OutputStream; - -public class InputPumper implements Runnable{ - private InputStream src; - private OutputStream dest; - private Boolean checkAvailable; - public InputPumper(InputStream src, - OutputStream dest, - Boolean checkAvailable){ - this.src = src; - this.dest = dest; - this.checkAvailable = checkAvailable; - } - - boolean running = true; - public void run() { - byte[] buffer = new byte[1024]; - try{ - while(running){ - if (checkAvailable && src.available() == 0) Thread.sleep(2); - else { - int n = src.read(buffer); - if (n == -1) running = false; - else { - dest.write(buffer, 0, n); - dest.flush(); - } - } - } - }catch(Exception e){ - throw new RuntimeException(e); - } - } -} \ No newline at end of file diff --git a/clientserver/src/mill/clientserver/Locks.java b/clientserver/src/mill/clientserver/Locks.java deleted file mode 100644 index 78c8dc6e..00000000 --- a/clientserver/src/mill/clientserver/Locks.java +++ /dev/null @@ -1,105 +0,0 @@ -package mill.clientserver; - -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; -import java.util.concurrent.locks.ReentrantLock; - - -abstract class Lock{ - abstract public Locked lock() throws Exception; - abstract public Locked tryLock() throws Exception; - public void await() throws Exception{ - lock().release(); - } - - /** - * Returns `true` if the lock is *available for taking* - */ - abstract public boolean probe() throws Exception; -} -interface Locked{ - void release() throws Exception; -} -class Locks{ - public Lock processLock; - public Lock serverLock; - public Lock clientLock; - static Locks files(String lockBase) throws Exception{ - return new Locks(){{ - processLock = new FileLock(lockBase + "/pid"); - - serverLock = new FileLock(lockBase + "/serverLock"); - - clientLock = new FileLock(lockBase + "/clientLock"); - }}; - } - static Locks memory(){ - return new Locks(){{ - this.processLock = new MemoryLock(); - this.serverLock = new MemoryLock(); - this.clientLock = new MemoryLock(); - }}; - } -} -class FileLocked implements Locked{ - private java.nio.channels.FileLock lock; - public FileLocked(java.nio.channels.FileLock lock){ - this.lock = lock; - } - public void release() throws Exception{ - this.lock.release(); - } -} - -class FileLock extends Lock{ - String path; - RandomAccessFile raf; - FileChannel chan; - public FileLock(String path) throws Exception{ - this.path = path; - raf = new RandomAccessFile(path, "rw"); - chan = raf.getChannel(); - } - - public Locked lock() throws Exception{ - return new FileLocked(chan.lock()); - } - public Locked tryLock() throws Exception{ - java.nio.channels.FileLock l = chan.tryLock(); - if (l == null) return null; - else return new FileLocked(l); - } - public boolean probe()throws Exception{ - java.nio.channels.FileLock l = chan.tryLock(); - if (l == null) return false; - else { - l.release(); - return true; - } - } -} -class MemoryLocked implements Locked{ - java.util.concurrent.locks.Lock l; - public MemoryLocked(java.util.concurrent.locks.Lock l){ - this.l = l; - } - public void release() throws Exception{ - l.unlock(); - } -} - -class MemoryLock extends Lock{ - ReentrantLock innerLock = new ReentrantLock(true); - - public boolean probe(){ - return !innerLock.isLocked(); - } - public Locked lock() { - innerLock.lock(); - return new MemoryLocked(innerLock); - } - public Locked tryLock() { - if (innerLock.tryLock()) return new MemoryLocked(innerLock); - else return null; - } -} diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala deleted file mode 100644 index 24827ac2..00000000 --- a/clientserver/src/mill/clientserver/Server.scala +++ /dev/null @@ -1,183 +0,0 @@ -package mill.clientserver - -import java.io._ -import java.net.Socket - -import org.scalasbt.ipcsocket._ - -trait ServerMain[T]{ - def main(args0: Array[String]): Unit = { - new Server( - args0(0), - this, - () => System.exit(0), - 300000, - Locks.files(args0(0)) - ).run() - } - var stateCache = Option.empty[T] - def main0(args: Array[String], - stateCache: Option[T], - mainInteractive: Boolean, - stdin: InputStream, - stdout: PrintStream, - stderr: PrintStream): (Boolean, Option[T]) -} - - -class Server[T](lockBase: String, - sm: ServerMain[T], - interruptServer: () => Unit, - acceptTimeout: Int, - locks: Locks) { - - val originalStdout = System.out - def run() = { - Server.tryLockBlock(locks.processLock){ - var running = true - while (running) { - Server.lockBlock(locks.serverLock){ - val (serverSocket, socketClose) = if (ClientServer.isWindows) { - val socketName = ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName - (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close()) - } else { - val socketName = lockBase + "/io" - new File(socketName).delete() - (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close()) - } - - val sockOpt = Server.interruptWith( - acceptTimeout, - socketClose(), - serverSocket.accept() - ) - - sockOpt match{ - case None => running = false - case Some(sock) => - try { - handleRun(sock) - serverSocket.close() - } - catch{case e: Throwable => e.printStackTrace(originalStdout) } - } - } - // Make sure you give an opportunity for the client to probe the lock - // and realize the server has released it to signal completion - Thread.sleep(10) - } - }.getOrElse(throw new Exception("PID already present")) - } - - def handleRun(clientSocket: Socket) = { - - val currentOutErr = clientSocket.getOutputStream - val socketIn = clientSocket.getInputStream - val argStream = new FileInputStream(lockBase + "/run") - val interactive = argStream.read() != 0; - val args = ClientServer.parseArgs(argStream) - argStream.close() - - var done = false - val t = new Thread(() => - - try { - val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true) - val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true) - val (result, newStateCache) = sm.main0( - args, - sm.stateCache, - interactive, - socketIn, - stdout, stderr - ) - - sm.stateCache = newStateCache - java.nio.file.Files.write( - java.nio.file.Paths.get(lockBase + "/exitCode"), - (if (result) 0 else 1).toString.getBytes - ) - } catch{case WatchInterrupted(sc: Option[T]) => - sm.stateCache = sc - } finally{ - done = true - } - ) - - t.start() - // We cannot simply use Lock#await here, because the filesystem doesn't - // realize the clientLock/serverLock are held by different threads in the - // two processes and gives a spurious deadlock error - while(!done && !locks.clientLock.probe()) { - Thread.sleep(3) - } - - if (!done) interruptServer() - - t.interrupt() - t.stop() - - if (ClientServer.isWindows) { - // Closing Win32NamedPipeSocket can often take ~5s - // It seems OK to exit the client early and subsequently - // start up mill client again (perhaps closing the server - // socket helps speed up the process). - val t = new Thread(() => clientSocket.close()) - t.setDaemon(true) - t.start() - } else clientSocket.close() - } -} -object Server{ - def lockBlock[T](lock: Lock)(t: => T): T = { - val l = lock.lock() - try t - finally l.release() - } - def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = { - lock.tryLock() match{ - case null => None - case l => - try Some(t) - finally l.release() - } - - } - def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = { - @volatile var interrupt = true - @volatile var interrupted = false - new Thread(() => { - Thread.sleep(millis) - if (interrupt) { - interrupted = true - close - } - }).start() - - try { - val res = - try Some(t) - catch {case e: Throwable => None} - - if (interrupted) None - else res - - } finally { - interrupt = false - } - } -} - -class ProxyOutputStream(x: => java.io.OutputStream, - key: Int) extends java.io.OutputStream { - override def write(b: Int) = x.synchronized{ - x.write(key) - x.write(b) - } -} -class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{ - def read() = x.read() - override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len) - override def read(b: Array[Byte]) = x.read(b) -} -case class WatchInterrupted[T](stateCache: Option[T]) extends Exception -- cgit v1.2.3