summaryrefslogtreecommitdiff
path: root/clientserver/src
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-04-07 09:40:07 -0700
committerLi Haoyi <haoyi.sg@gmail.com>2018-04-07 11:08:17 -0700
commit9598b243d7c5108a99fd98860810f71f6302aec1 (patch)
treeecb446f42e95845453722f37927bb9e1374fb75e /clientserver/src
parentcfb494443ff84c30c8fab457fdc9dcfad7d76769 (diff)
downloadmill-9598b243d7c5108a99fd98860810f71f6302aec1.tar.gz
mill-9598b243d7c5108a99fd98860810f71f6302aec1.tar.bz2
mill-9598b243d7c5108a99fd98860810f71f6302aec1.zip
first pass at moving mill client over to JavaModule
Diffstat (limited to 'clientserver/src')
-rw-r--r--clientserver/src/mill/clientserver/Client.java219
-rw-r--r--clientserver/src/mill/clientserver/ClientServer.java42
-rw-r--r--clientserver/src/mill/clientserver/InputPumper.java37
-rw-r--r--clientserver/src/mill/clientserver/Locks.java105
-rw-r--r--clientserver/src/mill/clientserver/Server.scala183
5 files changed, 0 insertions, 586 deletions
diff --git a/clientserver/src/mill/clientserver/Client.java b/clientserver/src/mill/clientserver/Client.java
deleted file mode 100644
index c4cbc265..00000000
--- a/clientserver/src/mill/clientserver/Client.java
+++ /dev/null
@@ -1,219 +0,0 @@
-package mill.clientserver;
-
-import org.scalasbt.ipcsocket.*;
-
-import java.io.*;
-import java.net.Socket;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Properties;
-
-public class Client {
- static void initServer(String lockBase, boolean setJnaNoSys) throws IOException,URISyntaxException{
- ArrayList<String> selfJars = new ArrayList<String>();
- ClassLoader current = Client.class.getClassLoader();
- while(current != null){
- if (current instanceof java.net.URLClassLoader) {
- URL[] urls = ((java.net.URLClassLoader) current).getURLs();
- for (URL url: urls) {
- selfJars.add(new File(url.toURI()).getCanonicalPath());
- }
- }
- current = current.getParent();
- }
- if (ClientServer.isJava9OrAbove) {
- selfJars.addAll(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
- }
- ArrayList<String> l = new java.util.ArrayList<String>();
- l.add("java");
- Properties props = System.getProperties();
- Iterator<String> keys = props.stringPropertyNames().iterator();
- while(keys.hasNext()){
- String k = keys.next();
- if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k));
- }
- if (setJnaNoSys) {
- l.add("-Djna.nosys=true");
- }
- l.add("-cp");
- l.add(String.join(File.pathSeparator, selfJars));
- l.add("mill.ServerMain");
- l.add(lockBase);
- new java.lang.ProcessBuilder()
- .command(l)
- .redirectOutput(new java.io.File(lockBase + "/logs"))
- .redirectError(new java.io.File(lockBase + "/logs"))
- .start();
- }
- public static void main(String[] args) throws Exception{
- boolean setJnaNoSys = System.getProperty("jna.nosys") == null;
- if (setJnaNoSys) {
- System.setProperty("jna.nosys", "true");
- }
- int index = 0;
- while (index < 5) {
- index += 1;
- String lockBase = "out/mill-worker-" + index;
- new java.io.File(lockBase).mkdirs();
- RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw");
- FileChannel channel = lockFile.getChannel();
- java.nio.channels.FileLock tryLock = channel.tryLock();
- if (tryLock == null) {
- lockFile.close();
- channel.close();
- } else {
- int exitCode = Client.run(
- lockBase,
- new Runnable() {
- @Override
- public void run() {
- try{
- initServer(lockBase, setJnaNoSys);
- }catch(Exception e){
- throw new RuntimeException(e);
- }
- }
- },
- Locks.files(lockBase),
- System.in,
- System.out,
- System.err,
- args
- );
- System.exit(exitCode);
- }
- }
- throw new Exception("Reached max process limit: " + 5);
- }
-
-
- public static int run(String lockBase,
- Runnable initServer,
- Locks locks,
- InputStream stdin,
- OutputStream stdout,
- OutputStream stderr,
- String[] args) throws Exception{
-
- FileOutputStream f = new FileOutputStream(lockBase + "/run");
- ClientServer.writeArgs(System.console() != null, args, f);
- f.close();
-
- boolean serverInit = false;
- if (locks.processLock.probe()) {
- serverInit = true;
- initServer.run();
- }
- while(locks.processLock.probe()) Thread.sleep(3);
-
- // Need to give sometime for Win32NamedPipeSocket to work
- // if the server is just initialized
- if (serverInit && ClientServer.isWindows) Thread.sleep(1000);
-
- Socket ioSocket = null;
-
- long retryStart = System.currentTimeMillis();
- while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){
- try{
- ioSocket = ClientServer.isWindows?
- new Win32NamedPipeSocket(ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName())
- : new UnixDomainSocket(lockBase + "/io");
- }catch(Throwable e){
- Thread.sleep(1);
- }
- }
- if (ioSocket == null){
- throw new Exception("Failed to connect to server");
- }
- InputStream outErr = ioSocket.getInputStream();
- OutputStream in = ioSocket.getOutputStream();
- ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr);
- InputPumper inPump = new InputPumper(stdin, in, true);
- Thread outThread = new Thread(outPump);
- outThread.setDaemon(true);
- Thread inThread = new Thread(inPump);
- inThread.setDaemon(true);
- outThread.start();
- inThread.start();
-
- locks.serverLock.await();
-
- try{
- return Integer.parseInt(
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(lockBase + "/exitCode")
- )
- ).readLine()
- );
- } catch(Throwable e){
- return 1;
- }
- }
-}
-
-class ClientOutputPumper implements Runnable{
- private InputStream src;
- private OutputStream dest1;
- private OutputStream dest2;
- public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){
- this.src = src;
- this.dest1 = dest1;
- this.dest2 = dest2;
- }
-
- public void run() {
- byte[] buffer = new byte[1024];
- int state = 0;
- boolean running = true;
- boolean first = true;
- while (running) {
- try {
- int n = src.read(buffer);
- first = false;
- if (n == -1) running = false;
- else {
- int i = 0;
- while (i < n) {
- switch (state) {
- case 0:
- state = buffer[i] + 1;
- break;
- case 1:
- dest1.write(buffer[i]);
- state = 0;
- break;
- case 2:
- dest2.write(buffer[i]);
- state = 0;
- break;
- }
-
- i += 1;
- }
- dest1.flush();
- dest2.flush();
- }
- } catch (IOException e) {
- // Win32NamedPipeSocket input stream somehow doesn't return -1,
- // instead it throws an IOException whose message contains "ReadFile()".
- // However, if it throws an IOException before ever reading some bytes,
- // it could not connect to the server, so exit.
- if (ClientServer.isWindows && e.getMessage().contains("ReadFile()")) {
- if (first) {
- System.err.println("Failed to connect to server");
- System.exit(1);
- } else running = false;
- } else {
- e.printStackTrace();
- System.exit(1);
- }
- }
- }
- }
-
-}
diff --git a/clientserver/src/mill/clientserver/ClientServer.java b/clientserver/src/mill/clientserver/ClientServer.java
deleted file mode 100644
index e2e63dcf..00000000
--- a/clientserver/src/mill/clientserver/ClientServer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package mill.clientserver;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class ClientServer {
- public static boolean isWindows = System.getProperty("os.name").toLowerCase().startsWith("windows");
- public static boolean isJava9OrAbove = !System.getProperty("java.specification.version").startsWith("1.");
-
- // Windows named pipe prefix (see https://github.com/sbt/ipcsocket/blob/v1.0.0/README.md)
- // Win32NamedPipeServerSocket automatically adds this as a prefix (if it is not already is prefixed),
- // but Win32NamedPipeSocket does not
- // https://github.com/sbt/ipcsocket/blob/v1.0.0/src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeServerSocket.java#L36
- public static String WIN32_PIPE_PREFIX = "\\\\.\\pipe\\";
-
- public static String[] parseArgs(InputStream argStream) throws IOException {
-
- int argsLength = argStream.read();
- String[] args = new String[argsLength];
- for (int i = 0; i < args.length; i++) {
- int n = argStream.read();
- byte[] arr = new byte[n];
- argStream.read(arr);
- args[i] = new String(arr);
- }
- return args;
- }
- public static void writeArgs(Boolean interactive,
- String[] args,
- OutputStream argStream) throws IOException{
- argStream.write(interactive ? 1 : 0);
- argStream.write(args.length);
- int i = 0;
- while (i < args.length){
- argStream.write(args[i].length());
- argStream.write(args[i].getBytes());
- i += 1;
- }
- }
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/InputPumper.java b/clientserver/src/mill/clientserver/InputPumper.java
deleted file mode 100644
index 9f4bfb82..00000000
--- a/clientserver/src/mill/clientserver/InputPumper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package mill.clientserver;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class InputPumper implements Runnable{
- private InputStream src;
- private OutputStream dest;
- private Boolean checkAvailable;
- public InputPumper(InputStream src,
- OutputStream dest,
- Boolean checkAvailable){
- this.src = src;
- this.dest = dest;
- this.checkAvailable = checkAvailable;
- }
-
- boolean running = true;
- public void run() {
- byte[] buffer = new byte[1024];
- try{
- while(running){
- if (checkAvailable && src.available() == 0) Thread.sleep(2);
- else {
- int n = src.read(buffer);
- if (n == -1) running = false;
- else {
- dest.write(buffer, 0, n);
- dest.flush();
- }
- }
- }
- }catch(Exception e){
- throw new RuntimeException(e);
- }
- }
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/Locks.java b/clientserver/src/mill/clientserver/Locks.java
deleted file mode 100644
index 78c8dc6e..00000000
--- a/clientserver/src/mill/clientserver/Locks.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package mill.clientserver;
-
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-abstract class Lock{
- abstract public Locked lock() throws Exception;
- abstract public Locked tryLock() throws Exception;
- public void await() throws Exception{
- lock().release();
- }
-
- /**
- * Returns `true` if the lock is *available for taking*
- */
- abstract public boolean probe() throws Exception;
-}
-interface Locked{
- void release() throws Exception;
-}
-class Locks{
- public Lock processLock;
- public Lock serverLock;
- public Lock clientLock;
- static Locks files(String lockBase) throws Exception{
- return new Locks(){{
- processLock = new FileLock(lockBase + "/pid");
-
- serverLock = new FileLock(lockBase + "/serverLock");
-
- clientLock = new FileLock(lockBase + "/clientLock");
- }};
- }
- static Locks memory(){
- return new Locks(){{
- this.processLock = new MemoryLock();
- this.serverLock = new MemoryLock();
- this.clientLock = new MemoryLock();
- }};
- }
-}
-class FileLocked implements Locked{
- private java.nio.channels.FileLock lock;
- public FileLocked(java.nio.channels.FileLock lock){
- this.lock = lock;
- }
- public void release() throws Exception{
- this.lock.release();
- }
-}
-
-class FileLock extends Lock{
- String path;
- RandomAccessFile raf;
- FileChannel chan;
- public FileLock(String path) throws Exception{
- this.path = path;
- raf = new RandomAccessFile(path, "rw");
- chan = raf.getChannel();
- }
-
- public Locked lock() throws Exception{
- return new FileLocked(chan.lock());
- }
- public Locked tryLock() throws Exception{
- java.nio.channels.FileLock l = chan.tryLock();
- if (l == null) return null;
- else return new FileLocked(l);
- }
- public boolean probe()throws Exception{
- java.nio.channels.FileLock l = chan.tryLock();
- if (l == null) return false;
- else {
- l.release();
- return true;
- }
- }
-}
-class MemoryLocked implements Locked{
- java.util.concurrent.locks.Lock l;
- public MemoryLocked(java.util.concurrent.locks.Lock l){
- this.l = l;
- }
- public void release() throws Exception{
- l.unlock();
- }
-}
-
-class MemoryLock extends Lock{
- ReentrantLock innerLock = new ReentrantLock(true);
-
- public boolean probe(){
- return !innerLock.isLocked();
- }
- public Locked lock() {
- innerLock.lock();
- return new MemoryLocked(innerLock);
- }
- public Locked tryLock() {
- if (innerLock.tryLock()) return new MemoryLocked(innerLock);
- else return null;
- }
-}
diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala
deleted file mode 100644
index 24827ac2..00000000
--- a/clientserver/src/mill/clientserver/Server.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-package mill.clientserver
-
-import java.io._
-import java.net.Socket
-
-import org.scalasbt.ipcsocket._
-
-trait ServerMain[T]{
- def main(args0: Array[String]): Unit = {
- new Server(
- args0(0),
- this,
- () => System.exit(0),
- 300000,
- Locks.files(args0(0))
- ).run()
- }
- var stateCache = Option.empty[T]
- def main0(args: Array[String],
- stateCache: Option[T],
- mainInteractive: Boolean,
- stdin: InputStream,
- stdout: PrintStream,
- stderr: PrintStream): (Boolean, Option[T])
-}
-
-
-class Server[T](lockBase: String,
- sm: ServerMain[T],
- interruptServer: () => Unit,
- acceptTimeout: Int,
- locks: Locks) {
-
- val originalStdout = System.out
- def run() = {
- Server.tryLockBlock(locks.processLock){
- var running = true
- while (running) {
- Server.lockBlock(locks.serverLock){
- val (serverSocket, socketClose) = if (ClientServer.isWindows) {
- val socketName = ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName
- (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close())
- } else {
- val socketName = lockBase + "/io"
- new File(socketName).delete()
- (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close())
- }
-
- val sockOpt = Server.interruptWith(
- acceptTimeout,
- socketClose(),
- serverSocket.accept()
- )
-
- sockOpt match{
- case None => running = false
- case Some(sock) =>
- try {
- handleRun(sock)
- serverSocket.close()
- }
- catch{case e: Throwable => e.printStackTrace(originalStdout) }
- }
- }
- // Make sure you give an opportunity for the client to probe the lock
- // and realize the server has released it to signal completion
- Thread.sleep(10)
- }
- }.getOrElse(throw new Exception("PID already present"))
- }
-
- def handleRun(clientSocket: Socket) = {
-
- val currentOutErr = clientSocket.getOutputStream
- val socketIn = clientSocket.getInputStream
- val argStream = new FileInputStream(lockBase + "/run")
- val interactive = argStream.read() != 0;
- val args = ClientServer.parseArgs(argStream)
- argStream.close()
-
- var done = false
- val t = new Thread(() =>
-
- try {
- val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true)
- val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true)
- val (result, newStateCache) = sm.main0(
- args,
- sm.stateCache,
- interactive,
- socketIn,
- stdout, stderr
- )
-
- sm.stateCache = newStateCache
- java.nio.file.Files.write(
- java.nio.file.Paths.get(lockBase + "/exitCode"),
- (if (result) 0 else 1).toString.getBytes
- )
- } catch{case WatchInterrupted(sc: Option[T]) =>
- sm.stateCache = sc
- } finally{
- done = true
- }
- )
-
- t.start()
- // We cannot simply use Lock#await here, because the filesystem doesn't
- // realize the clientLock/serverLock are held by different threads in the
- // two processes and gives a spurious deadlock error
- while(!done && !locks.clientLock.probe()) {
- Thread.sleep(3)
- }
-
- if (!done) interruptServer()
-
- t.interrupt()
- t.stop()
-
- if (ClientServer.isWindows) {
- // Closing Win32NamedPipeSocket can often take ~5s
- // It seems OK to exit the client early and subsequently
- // start up mill client again (perhaps closing the server
- // socket helps speed up the process).
- val t = new Thread(() => clientSocket.close())
- t.setDaemon(true)
- t.start()
- } else clientSocket.close()
- }
-}
-object Server{
- def lockBlock[T](lock: Lock)(t: => T): T = {
- val l = lock.lock()
- try t
- finally l.release()
- }
- def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
- lock.tryLock() match{
- case null => None
- case l =>
- try Some(t)
- finally l.release()
- }
-
- }
- def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
- @volatile var interrupt = true
- @volatile var interrupted = false
- new Thread(() => {
- Thread.sleep(millis)
- if (interrupt) {
- interrupted = true
- close
- }
- }).start()
-
- try {
- val res =
- try Some(t)
- catch {case e: Throwable => None}
-
- if (interrupted) None
- else res
-
- } finally {
- interrupt = false
- }
- }
-}
-
-class ProxyOutputStream(x: => java.io.OutputStream,
- key: Int) extends java.io.OutputStream {
- override def write(b: Int) = x.synchronized{
- x.write(key)
- x.write(b)
- }
-}
-class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
- def read() = x.read()
- override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
- override def read(b: Array[Byte]) = x.read(b)
-}
-case class WatchInterrupted[T](stateCache: Option[T]) extends Exception