diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/src/mill/client/ClientServer.java | 97 | ||||
-rw-r--r-- | client/src/mill/client/InputPumper.java | 37 | ||||
-rw-r--r-- | client/src/mill/client/Lock.java | 13 | ||||
-rw-r--r-- | client/src/mill/client/Locked.java | 10 | ||||
-rw-r--r-- | client/src/mill/client/Locks.java | 89 | ||||
-rw-r--r-- | client/src/mill/client/Main.java | 221 |
6 files changed, 467 insertions, 0 deletions
diff --git a/client/src/mill/client/ClientServer.java b/client/src/mill/client/ClientServer.java new file mode 100644 index 00000000..468f8ab3 --- /dev/null +++ b/client/src/mill/client/ClientServer.java @@ -0,0 +1,97 @@ +package mill.client; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +public class ClientServer { + public static boolean isWindows = System.getProperty("os.name").toLowerCase().startsWith("windows"); + public static boolean isJava9OrAbove = !System.getProperty("java.specification.version").startsWith("1."); + + // Windows named pipe prefix (see https://github.com/sbt/ipcsocket/blob/v1.0.0/README.md) + // Win32NamedPipeServerSocket automatically adds this as a prefix (if it is not already is prefixed), + // but Win32NamedPipeSocket does not + // https://github.com/sbt/ipcsocket/blob/v1.0.0/src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeServerSocket.java#L36 + public static String WIN32_PIPE_PREFIX = "\\\\.\\pipe\\"; + + public static String[] parseArgs(InputStream argStream) throws IOException { + + int argsLength = argStream.read(); + String[] args = new String[argsLength]; + for (int i = 0; i < args.length; i++) { + args[i] = readString(argStream); + } + return args; + } + public static void writeArgs(Boolean interactive, + String[] args, + OutputStream argStream) throws IOException { + argStream.write(interactive ? 1 : 0); + argStream.write(args.length); + int i = 0; + while (i < args.length) { + writeString(argStream, args[i]); + i += 1; + } + } + + /** + * This allows the mill client to pass the environment as he sees it to the + * server (as the server remains alive over the course of several runs and + * does not see the environment changes the client would) + */ + public static void writeMap(Map<String, String> map, OutputStream argStream) throws IOException { + argStream.write(map.size()); + for (Map.Entry<String, String> kv : map.entrySet()) { + writeString(argStream, kv.getKey()); + writeString(argStream, kv.getValue()); + } + } + + public static Map<String, String> parseMap(InputStream argStream) throws IOException { + Map<String, String> env = new HashMap<>(); + int mapLength = argStream.read(); + for (int i = 0; i < mapLength; i++) { + String key = readString(argStream); + String value = readString(argStream); + env.put(key, value); + } + return env; + } + + private static String readString(InputStream inputStream) throws IOException { + // Result is between 0 and 255, hence the loop. + int read = inputStream.read(); + int bytesToRead = read; + while(read == 255){ + read = inputStream.read(); + bytesToRead += read; + } + byte[] arr = new byte[bytesToRead]; + int readTotal = 0; + while (readTotal < bytesToRead) { + read = inputStream.read(arr, readTotal, bytesToRead - readTotal); + readTotal += read; + } + return new String(arr); + } + + private static void writeString(OutputStream outputStream, String string) throws IOException { + // When written, an int > 255 gets splitted. This logic performs the + // split beforehand so that the reading side knows that there is still + // more metadata to come before it's able to read the actual data. + // Could do with rewriting using logical masks / shifts. + byte[] bytes = string.getBytes(); + int toWrite = bytes.length; + while(toWrite >= 255){ + outputStream.write(255); + toWrite = toWrite - 255; + } + outputStream.write(toWrite); + outputStream.write(bytes); + } + +} diff --git a/client/src/mill/client/InputPumper.java b/client/src/mill/client/InputPumper.java new file mode 100644 index 00000000..1789d069 --- /dev/null +++ b/client/src/mill/client/InputPumper.java @@ -0,0 +1,37 @@ +package mill.client; + +import java.io.InputStream; +import java.io.OutputStream; + +public class InputPumper implements Runnable{ + private InputStream src; + private OutputStream dest; + private Boolean checkAvailable; + public InputPumper(InputStream src, + OutputStream dest, + Boolean checkAvailable){ + this.src = src; + this.dest = dest; + this.checkAvailable = checkAvailable; + } + + boolean running = true; + public void run() { + byte[] buffer = new byte[1024]; + try{ + while(running){ + if (checkAvailable && src.available() == 0) Thread.sleep(2); + else { + int n = src.read(buffer); + if (n == -1) running = false; + else { + dest.write(buffer, 0, n); + dest.flush(); + } + } + } + }catch(Exception e){ + throw new RuntimeException(e); + } + } +}
\ No newline at end of file diff --git a/client/src/mill/client/Lock.java b/client/src/mill/client/Lock.java new file mode 100644 index 00000000..115529d3 --- /dev/null +++ b/client/src/mill/client/Lock.java @@ -0,0 +1,13 @@ +package mill.client; +public abstract class Lock{ + abstract public Locked lock() throws Exception; + abstract public Locked tryLock() throws Exception; + public void await() throws Exception{ + lock().release(); + } + + /** + * Returns `true` if the lock is *available for taking* + */ + abstract public boolean probe() throws Exception; +}
\ No newline at end of file diff --git a/client/src/mill/client/Locked.java b/client/src/mill/client/Locked.java new file mode 100644 index 00000000..6ba7dab5 --- /dev/null +++ b/client/src/mill/client/Locked.java @@ -0,0 +1,10 @@ +package mill.client; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.concurrent.locks.ReentrantLock; + + +public interface Locked{ + public void release() throws Exception; +}
\ No newline at end of file diff --git a/client/src/mill/client/Locks.java b/client/src/mill/client/Locks.java new file mode 100644 index 00000000..3b397fce --- /dev/null +++ b/client/src/mill/client/Locks.java @@ -0,0 +1,89 @@ +package mill.client; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.concurrent.locks.ReentrantLock; + +public class Locks{ + public Lock processLock; + public Lock serverLock; + public Lock clientLock; + public static Locks files(String lockBase) throws Exception{ + return new Locks(){{ + processLock = new FileLock(lockBase + "/pid"); + + serverLock = new FileLock(lockBase + "/serverLock"); + + clientLock = new FileLock(lockBase + "/clientLock"); + }}; + } + public static Locks memory(){ + return new Locks(){{ + this.processLock = new MemoryLock(); + this.serverLock = new MemoryLock(); + this.clientLock = new MemoryLock(); + }}; + } +} +class FileLocked implements Locked{ + private java.nio.channels.FileLock lock; + public FileLocked(java.nio.channels.FileLock lock){ + this.lock = lock; + } + public void release() throws Exception{ + this.lock.release(); + } +} + +class FileLock extends Lock{ + String path; + RandomAccessFile raf; + FileChannel chan; + public FileLock(String path) throws Exception{ + this.path = path; + raf = new RandomAccessFile(path, "rw"); + chan = raf.getChannel(); + } + + public Locked lock() throws Exception{ + return new FileLocked(chan.lock()); + } + public Locked tryLock() throws Exception{ + java.nio.channels.FileLock l = chan.tryLock(); + if (l == null) return null; + else return new FileLocked(l); + } + public boolean probe()throws Exception{ + java.nio.channels.FileLock l = chan.tryLock(); + if (l == null) return false; + else { + l.release(); + return true; + } + } +} +class MemoryLocked implements Locked{ + java.util.concurrent.locks.Lock l; + public MemoryLocked(java.util.concurrent.locks.Lock l){ + this.l = l; + } + public void release() throws Exception{ + l.unlock(); + } +} + +class MemoryLock extends Lock{ + ReentrantLock innerLock = new ReentrantLock(true); + + public boolean probe(){ + return !innerLock.isLocked(); + } + public Locked lock() { + innerLock.lock(); + return new MemoryLocked(innerLock); + } + public Locked tryLock() { + if (innerLock.tryLock()) return new MemoryLocked(innerLock); + else return null; + } +} diff --git a/client/src/mill/client/Main.java b/client/src/mill/client/Main.java new file mode 100644 index 00000000..109a9a9d --- /dev/null +++ b/client/src/mill/client/Main.java @@ -0,0 +1,221 @@ +package mill.client; + +import org.scalasbt.ipcsocket.*; + +import java.io.*; +import java.net.Socket; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.FileChannel; +import java.util.*; + +public class Main { + static void initServer(String lockBase, boolean setJnaNoSys) throws IOException,URISyntaxException{ + ArrayList<String> selfJars = new ArrayList<String>(); + ClassLoader current = Main.class.getClassLoader(); + while(current != null){ + if (current instanceof java.net.URLClassLoader) { + URL[] urls = ((java.net.URLClassLoader) current).getURLs(); + for (URL url: urls) { + selfJars.add(new File(url.toURI()).getCanonicalPath()); + } + } + current = current.getParent(); + } + if (ClientServer.isJava9OrAbove) { + selfJars.addAll(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); + } + ArrayList<String> l = new java.util.ArrayList<String>(); + l.add("java"); + Properties props = System.getProperties(); + Iterator<String> keys = props.stringPropertyNames().iterator(); + while(keys.hasNext()){ + String k = keys.next(); + if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)); + } + if (setJnaNoSys) { + l.add("-Djna.nosys=true"); + } + l.add("-cp"); + l.add(String.join(File.pathSeparator, selfJars)); + l.add("mill.main.ServerMain"); + l.add(lockBase); + + new java.lang.ProcessBuilder() + .command(l) + .redirectOutput(new java.io.File(lockBase + "/logs")) + .redirectError(new java.io.File(lockBase + "/logs")) + .start(); + } + public static void main(String[] args) throws Exception{ + boolean setJnaNoSys = System.getProperty("jna.nosys") == null; + Map<String, String> env = System.getenv(); + if (setJnaNoSys) { + System.setProperty("jna.nosys", "true"); + } + int index = 0; + while (index < 5) { + index += 1; + String lockBase = "out/mill-worker-" + index; + new java.io.File(lockBase).mkdirs(); + RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw"); + FileChannel channel = lockFile.getChannel(); + java.nio.channels.FileLock tryLock = channel.tryLock(); + if (tryLock == null) { + lockFile.close(); + channel.close(); + } else { + int exitCode = Main.run( + lockBase, + new Runnable() { + @Override + public void run() { + try{ + initServer(lockBase, setJnaNoSys); + }catch(Exception e){ + throw new RuntimeException(e); + } + } + }, + Locks.files(lockBase), + System.in, + System.out, + System.err, + args, + env + ); + System.exit(exitCode); + } + } + throw new Exception("Reached max process limit: " + 5); + } + + + public static int run(String lockBase, + Runnable initServer, + Locks locks, + InputStream stdin, + OutputStream stdout, + OutputStream stderr, + String[] args, + Map<String, String> env) throws Exception{ + + FileOutputStream f = new FileOutputStream(lockBase + "/run"); + ClientServer.writeArgs(System.console() != null, args, f); + ClientServer.writeMap(env, f); + f.close(); + + boolean serverInit = false; + if (locks.processLock.probe()) { + serverInit = true; + initServer.run(); + } + while(locks.processLock.probe()) Thread.sleep(3); + + // Need to give sometime for Win32NamedPipeSocket to work + // if the server is just initialized + if (serverInit && ClientServer.isWindows) Thread.sleep(1000); + + Socket ioSocket = null; + + long retryStart = System.currentTimeMillis(); + while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){ + try{ + ioSocket = ClientServer.isWindows? + new Win32NamedPipeSocket(ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName()) + : new UnixDomainSocket(lockBase + "/io"); + }catch(Throwable e){ + Thread.sleep(1); + } + } + if (ioSocket == null){ + throw new Exception("Failed to connect to server"); + } + InputStream outErr = ioSocket.getInputStream(); + OutputStream in = ioSocket.getOutputStream(); + ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr); + InputPumper inPump = new InputPumper(stdin, in, true); + Thread outThread = new Thread(outPump); + outThread.setDaemon(true); + Thread inThread = new Thread(inPump); + inThread.setDaemon(true); + outThread.start(); + inThread.start(); + + locks.serverLock.await(); + + try{ + return Integer.parseInt( + new BufferedReader( + new InputStreamReader( + new FileInputStream(lockBase + "/exitCode") + ) + ).readLine() + ); + } catch(Throwable e){ + return 1; + } + } +} + +class ClientOutputPumper implements Runnable{ + private InputStream src; + private OutputStream dest1; + private OutputStream dest2; + public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){ + this.src = src; + this.dest1 = dest1; + this.dest2 = dest2; + } + + public void run() { + byte[] buffer = new byte[1024]; + int state = 0; + boolean running = true; + boolean first = true; + while (running) { + try { + int n = src.read(buffer); + first = false; + if (n == -1) running = false; + else { + int i = 0; + while (i < n) { + switch (state) { + case 0: + state = buffer[i] + 1; + break; + case 1: + dest1.write(buffer[i]); + state = 0; + break; + case 2: + dest2.write(buffer[i]); + state = 0; + break; + } + + i += 1; + } + dest1.flush(); + dest2.flush(); + } + } catch (IOException e) { + // Win32NamedPipeSocket input stream somehow doesn't return -1, + // instead it throws an IOException whose message contains "ReadFile()". + // However, if it throws an IOException before ever reading some bytes, + // it could not connect to the server, so exit. + if (ClientServer.isWindows && e.getMessage().contains("ReadFile()")) { + if (first) { + System.err.println("Failed to connect to server"); + System.exit(1); + } else running = false; + } else { + e.printStackTrace(); + System.exit(1); + } + } + } + } + +} |