diff options
author | Li Haoyi <haoyi.sg@gmail.com> | 2018-04-12 21:24:18 -0700 |
---|---|---|
committer | Li Haoyi <haoyi.sg@gmail.com> | 2018-04-12 22:08:03 -0700 |
commit | 2bc041237984bf674ea144ad1a14710c3ed2e47c (patch) | |
tree | 1ac405a266c03fdf4c406fd6cf8d23ea7ea09a05 /main | |
parent | 948b697aa85ccd6062ce1d001703e1c428cfa397 (diff) | |
download | mill-2bc041237984bf674ea144ad1a14710c3ed2e47c.tar.gz mill-2bc041237984bf674ea144ad1a14710c3ed2e47c.tar.bz2 mill-2bc041237984bf674ea144ad1a14710c3ed2e47c.zip |
rename modules scalaworker -> scalalib.worker, client -> main.client
Diffstat (limited to 'main')
-rw-r--r-- | main/client/src/mill/main/client/InputPumper.java | 37 | ||||
-rw-r--r-- | main/client/src/mill/main/client/Lock.java | 13 | ||||
-rw-r--r-- | main/client/src/mill/main/client/Locked.java | 10 | ||||
-rw-r--r-- | main/client/src/mill/main/client/Locks.java | 89 | ||||
-rw-r--r-- | main/client/src/mill/main/client/Main.java | 223 | ||||
-rw-r--r-- | main/client/src/mill/main/client/Util.java | 95 | ||||
-rw-r--r-- | main/client/test/src/mill/main/client/ClientTests.java | 61 | ||||
-rw-r--r-- | main/src/mill/Main.scala | 2 | ||||
-rw-r--r-- | main/src/mill/main/Server.scala | 4 | ||||
-rw-r--r-- | main/src/mill/modules/Jvm.scala | 2 | ||||
-rw-r--r-- | main/test/src/mill/main/ClientServerTests.scala | 4 |
11 files changed, 534 insertions, 6 deletions
diff --git a/main/client/src/mill/main/client/InputPumper.java b/main/client/src/mill/main/client/InputPumper.java new file mode 100644 index 00000000..5205be0b --- /dev/null +++ b/main/client/src/mill/main/client/InputPumper.java @@ -0,0 +1,37 @@ +package mill.main.client; + +import java.io.InputStream; +import java.io.OutputStream; + +public class InputPumper implements Runnable{ + private InputStream src; + private OutputStream dest; + private Boolean checkAvailable; + public InputPumper(InputStream src, + OutputStream dest, + Boolean checkAvailable){ + this.src = src; + this.dest = dest; + this.checkAvailable = checkAvailable; + } + + boolean running = true; + public void run() { + byte[] buffer = new byte[1024]; + try{ + while(running){ + if (checkAvailable && src.available() == 0) Thread.sleep(2); + else { + int n = src.read(buffer); + if (n == -1) running = false; + else { + dest.write(buffer, 0, n); + dest.flush(); + } + } + } + }catch(Exception e){ + throw new RuntimeException(e); + } + } +}
\ No newline at end of file diff --git a/main/client/src/mill/main/client/Lock.java b/main/client/src/mill/main/client/Lock.java new file mode 100644 index 00000000..890a352b --- /dev/null +++ b/main/client/src/mill/main/client/Lock.java @@ -0,0 +1,13 @@ +package mill.main.client; +public abstract class Lock{ + abstract public Locked lock() throws Exception; + abstract public Locked tryLock() throws Exception; + public void await() throws Exception{ + lock().release(); + } + + /** + * Returns `true` if the lock is *available for taking* + */ + abstract public boolean probe() throws Exception; +}
\ No newline at end of file diff --git a/main/client/src/mill/main/client/Locked.java b/main/client/src/mill/main/client/Locked.java new file mode 100644 index 00000000..e6ad3d63 --- /dev/null +++ b/main/client/src/mill/main/client/Locked.java @@ -0,0 +1,10 @@ +package mill.main.client; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.concurrent.locks.ReentrantLock; + + +public interface Locked{ + public void release() throws Exception; +}
\ No newline at end of file diff --git a/main/client/src/mill/main/client/Locks.java b/main/client/src/mill/main/client/Locks.java new file mode 100644 index 00000000..2843973d --- /dev/null +++ b/main/client/src/mill/main/client/Locks.java @@ -0,0 +1,89 @@ +package mill.main.client; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.concurrent.locks.ReentrantLock; + +public class Locks{ + public Lock processLock; + public Lock serverLock; + public Lock clientLock; + public static Locks files(String lockBase) throws Exception{ + return new Locks(){{ + processLock = new FileLock(lockBase + "/pid"); + + serverLock = new FileLock(lockBase + "/serverLock"); + + clientLock = new FileLock(lockBase + "/clientLock"); + }}; + } + public static Locks memory(){ + return new Locks(){{ + this.processLock = new MemoryLock(); + this.serverLock = new MemoryLock(); + this.clientLock = new MemoryLock(); + }}; + } +} +class FileLocked implements Locked{ + private java.nio.channels.FileLock lock; + public FileLocked(java.nio.channels.FileLock lock){ + this.lock = lock; + } + public void release() throws Exception{ + this.lock.release(); + } +} + +class FileLock extends Lock{ + String path; + RandomAccessFile raf; + FileChannel chan; + public FileLock(String path) throws Exception{ + this.path = path; + raf = new RandomAccessFile(path, "rw"); + chan = raf.getChannel(); + } + + public Locked lock() throws Exception{ + return new FileLocked(chan.lock()); + } + public Locked tryLock() throws Exception{ + java.nio.channels.FileLock l = chan.tryLock(); + if (l == null) return null; + else return new FileLocked(l); + } + public boolean probe()throws Exception{ + java.nio.channels.FileLock l = chan.tryLock(); + if (l == null) return false; + else { + l.release(); + return true; + } + } +} +class MemoryLocked implements Locked{ + java.util.concurrent.locks.Lock l; + public MemoryLocked(java.util.concurrent.locks.Lock l){ + this.l = l; + } + public void release() throws Exception{ + l.unlock(); + } +} + +class MemoryLock extends Lock{ + ReentrantLock innerLock = new ReentrantLock(true); + + public boolean probe(){ + return !innerLock.isLocked(); + } + public Locked lock() { + innerLock.lock(); + return new MemoryLocked(innerLock); + } + public Locked tryLock() { + if (innerLock.tryLock()) return new MemoryLocked(innerLock); + else return null; + } +} diff --git a/main/client/src/mill/main/client/Main.java b/main/client/src/mill/main/client/Main.java new file mode 100644 index 00000000..98445d3c --- /dev/null +++ b/main/client/src/mill/main/client/Main.java @@ -0,0 +1,223 @@ +package mill.main.client; + +import org.scalasbt.ipcsocket.*; + +import java.io.*; +import java.net.Socket; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.FileChannel; +import java.util.*; + +public class Main { + static void initServer(String lockBase, boolean setJnaNoSys) throws IOException,URISyntaxException{ + ArrayList<String> selfJars = new ArrayList<String>(); + ClassLoader current = Main.class.getClassLoader(); + while(current != null){ + if (current instanceof java.net.URLClassLoader) { + URL[] urls = ((java.net.URLClassLoader) current).getURLs(); + for (URL url: urls) { + selfJars.add(new File(url.toURI()).getCanonicalPath()); + } + } + current = current.getParent(); + } + if (Util.isJava9OrAbove) { + selfJars.addAll(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); + } + ArrayList<String> l = new java.util.ArrayList<String>(); + l.add("java"); + Properties props = System.getProperties(); + Iterator<String> keys = props.stringPropertyNames().iterator(); + while(keys.hasNext()){ + String k = keys.next(); + if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k)); + } + if (setJnaNoSys) { + l.add("-Djna.nosys=true"); + } + l.add("-cp"); + l.add(String.join(File.pathSeparator, selfJars)); + l.add("mill.main.ServerMain"); + l.add(lockBase); + + new java.lang.ProcessBuilder() + .command(l) + .redirectOutput(new java.io.File(lockBase + "/logs")) + .redirectError(new java.io.File(lockBase + "/logs")) + .start(); + } + public static void main(String[] args) throws Exception{ + boolean setJnaNoSys = System.getProperty("jna.nosys") == null; + Map<String, String> env = System.getenv(); + if (setJnaNoSys) { + System.setProperty("jna.nosys", "true"); + } + int index = 0; + while (index < 5) { + index += 1; + String lockBase = "out/mill-worker-" + index; + new java.io.File(lockBase).mkdirs(); + RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw"); + FileChannel channel = lockFile.getChannel(); + java.nio.channels.FileLock tryLock = channel.tryLock(); + if (tryLock == null) { + lockFile.close(); + channel.close(); + } else { + int exitCode = Main.run( + lockBase, + new Runnable() { + @Override + public void run() { + try{ + initServer(lockBase, setJnaNoSys); + }catch(Exception e){ + throw new RuntimeException(e); + } + } + }, + Locks.files(lockBase), + System.in, + System.out, + System.err, + args, + env + ); + System.exit(exitCode); + } + } + throw new Exception("Reached max process limit: " + 5); + } + + public static int run(String lockBase, + Runnable initServer, + Locks locks, + InputStream stdin, + OutputStream stdout, + OutputStream stderr, + String[] args, + Map<String, String> env) throws Exception{ + + FileOutputStream f = new FileOutputStream(lockBase + "/run"); + f.write(System.console() != null ? 1 : 0); + Util.writeString(f, System.getProperty("MILL_VERSION")); + Util.writeArgs(args, f); + Util.writeMap(env, f); + f.close(); + + boolean serverInit = false; + if (locks.processLock.probe()) { + serverInit = true; + initServer.run(); + } + while(locks.processLock.probe()) Thread.sleep(3); + + // Need to give sometime for Win32NamedPipeSocket to work + // if the server is just initialized + if (serverInit && Util.isWindows) Thread.sleep(1000); + + Socket ioSocket = null; + + long retryStart = System.currentTimeMillis(); + while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){ + try{ + ioSocket = Util.isWindows? + new Win32NamedPipeSocket(Util.WIN32_PIPE_PREFIX + new File(lockBase).getName()) + : new UnixDomainSocket(lockBase + "/io"); + }catch(Throwable e){ + Thread.sleep(1); + } + } + if (ioSocket == null){ + throw new Exception("Failed to connect to server"); + } + + InputStream outErr = ioSocket.getInputStream(); + OutputStream in = ioSocket.getOutputStream(); + ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr); + InputPumper inPump = new InputPumper(stdin, in, true); + Thread outThread = new Thread(outPump); + outThread.setDaemon(true); + Thread inThread = new Thread(inPump); + inThread.setDaemon(true); + outThread.start(); + inThread.start(); + + locks.serverLock.await(); + + try{ + return Integer.parseInt( + new BufferedReader( + new InputStreamReader( + new FileInputStream(lockBase + "/exitCode") + ) + ).readLine() + ); + } catch(Throwable e){ + return 1; + } + } +} + +class ClientOutputPumper implements Runnable{ + private InputStream src; + private OutputStream dest1; + private OutputStream dest2; + public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){ + this.src = src; + this.dest1 = dest1; + this.dest2 = dest2; + } + + public void run() { + byte[] buffer = new byte[1024]; + int state = 0; + boolean running = true; + boolean first = true; + while (running) { + try { + int n = src.read(buffer); + first = false; + if (n == -1) running = false; + else { + int i = 0; + while (i < n) { + switch (state) { + case 0: + state = buffer[i] + 1; + break; + case 1: + dest1.write(buffer[i]); + state = 0; + break; + case 2: + dest2.write(buffer[i]); + state = 0; + break; + } + + i += 1; + } + dest1.flush(); + dest2.flush(); + } + } catch (IOException e) { + // Win32NamedPipeSocket input stream somehow doesn't return -1, + // instead it throws an IOException whose message contains "ReadFile()". + // However, if it throws an IOException before ever reading some bytes, + // it could not connect to the server, so exit. + if (Util.isWindows && e.getMessage().contains("ReadFile()")) { + if (first) { + System.err.println("Failed to connect to server"); + System.exit(1); + } else running = false; + } else { + e.printStackTrace(); + System.exit(1); + } + } + } + } + +} diff --git a/main/client/src/mill/main/client/Util.java b/main/client/src/mill/main/client/Util.java new file mode 100644 index 00000000..54361734 --- /dev/null +++ b/main/client/src/mill/main/client/Util.java @@ -0,0 +1,95 @@ +package mill.main.client; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +public class Util { + public static boolean isWindows = System.getProperty("os.name").toLowerCase().startsWith("windows"); + public static boolean isJava9OrAbove = !System.getProperty("java.specification.version").startsWith("1."); + + // Windows named pipe prefix (see https://github.com/sbt/ipcsocket/blob/v1.0.0/README.md) + // Win32NamedPipeServerSocket automatically adds this as a prefix (if it is not already is prefixed), + // but Win32NamedPipeSocket does not + // https://github.com/sbt/ipcsocket/blob/v1.0.0/src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeServerSocket.java#L36 + public static String WIN32_PIPE_PREFIX = "\\\\.\\pipe\\"; + + public static String[] parseArgs(InputStream argStream) throws IOException { + + int argsLength = readInt(argStream); + String[] args = new String[argsLength]; + for (int i = 0; i < args.length; i++) { + args[i] = readString(argStream); + } + return args; + } + public static void writeArgs(String[] args, + OutputStream argStream) throws IOException { + writeInt(argStream, args.length); + for(String arg: args){ + writeString(argStream, arg); + } + } + + /** + * This allows the mill client to pass the environment as he sees it to the + * server (as the server remains alive over the course of several runs and + * does not see the environment changes the client would) + */ + public static void writeMap(Map<String, String> map, OutputStream argStream) throws IOException { + writeInt(argStream, map.size()); + for (Map.Entry<String, String> kv : map.entrySet()) { + writeString(argStream, kv.getKey()); + writeString(argStream, kv.getValue()); + } + } + + public static Map<String, String> parseMap(InputStream argStream) throws IOException { + Map<String, String> env = new HashMap<>(); + int mapLength = readInt(argStream); + for (int i = 0; i < mapLength; i++) { + String key = readString(argStream); + String value = readString(argStream); + env.put(key, value); + } + return env; + } + + public static String readString(InputStream inputStream) throws IOException { + // Result is between 0 and 255, hence the loop. + int length = readInt(inputStream); + byte[] arr = new byte[length]; + int total = 0; + while(total < length){ + int res = inputStream.read(arr, total, length-total); + if (res == -1) throw new IOException("Incomplete String"); + else{ + total += res; + } + } + return new String(arr); + } + + public static void writeString(OutputStream outputStream, String string) throws IOException { + byte[] bytes = string.getBytes(); + writeInt(outputStream, bytes.length); + outputStream.write(bytes); + } + + public static void writeInt(OutputStream out, int i) throws IOException{ + out.write((byte)(i >>> 24)); + out.write((byte)(i >>> 16)); + out.write((byte)(i >>> 8)); + out.write((byte)i); + } + public static int readInt(InputStream in) throws IOException{ + return ((in.read() & 0xFF) << 24) + + ((in.read() & 0xFF) << 16) + + ((in.read() & 0xFF) << 8) + + (in.read() & 0xFF); + } + +} diff --git a/main/client/test/src/mill/main/client/ClientTests.java b/main/client/test/src/mill/main/client/ClientTests.java new file mode 100644 index 00000000..5ae44d3f --- /dev/null +++ b/main/client/test/src/mill/main/client/ClientTests.java @@ -0,0 +1,61 @@ +package mill.main.client; + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public class ClientTests { + @Test + public void readWriteInt() throws Exception{ + int[] examples = { + 0, 1, 126, 127, 128, 254, 255, 256, 1024, 99999, 1234567, + Integer.MAX_VALUE, Integer.MAX_VALUE / 2, Integer.MIN_VALUE + }; + for(int example0: examples){ + for(int example: new int[]{-example0, example0}){ + ByteArrayOutputStream o = new ByteArrayOutputStream(); + Util.writeInt(o, example); + ByteArrayInputStream i = new ByteArrayInputStream(o.toByteArray()); + int s = Util.readInt(i); + assertEquals(example, s); + assertEquals(i.available(), 0); + } + } + } + @Test + public void readWriteString() throws Exception{ + String[] examples = { + "", + "hello", + "i am cow", + "i am cow\nhear me moo\ni weight twice as much as you", + "我是一个叉烧包", + }; + for(String example: examples){ + checkStringRoundTrip(example); + } + } + + @Test + public void readWriteBigString() throws Exception{ + int[] lengths = {0, 1, 126, 127, 128, 254, 255, 256, 1024, 99999, 1234567}; + for(int i = 0; i < lengths.length; i++){ + final char[] bigChars = new char[lengths[i]]; + java.util.Arrays.fill(bigChars, 'X'); + checkStringRoundTrip(new String(bigChars)); + } + } + + public void checkStringRoundTrip(String example) throws Exception{ + ByteArrayOutputStream o = new ByteArrayOutputStream(); + Util.writeString(o, example); + ByteArrayInputStream i = new ByteArrayInputStream(o.toByteArray()); + String s = Util.readString(i); + assertEquals(example, s); + assertEquals(i.available(), 0); + } + + +}
\ No newline at end of file diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala index 2992afa4..5573a325 100644 --- a/main/src/mill/Main.scala +++ b/main/src/mill/Main.scala @@ -106,7 +106,7 @@ object Main { env ) - if (mill.client.Util.isJava9OrAbove) { + if (mill.main.client.Util.isJava9OrAbove) { val rt = cliConfig.home / Export.rtJarName if (!exists(rt)) { runner.printInfo(s"Preparing Java ${System.getProperty("java.version")} runtime; this may take a minute or two ...") diff --git a/main/src/mill/main/Server.scala b/main/src/mill/main/Server.scala index 275767c8..07703bed 100644 --- a/main/src/mill/main/Server.scala +++ b/main/src/mill/main/Server.scala @@ -6,7 +6,7 @@ import java.net.Socket import mill.Main import scala.collection.JavaConverters._ import org.scalasbt.ipcsocket._ -import mill.client._ +import mill.main.client._ import mill.eval.Evaluator import mill.util.DummyInputStream @@ -28,7 +28,7 @@ object ServerMain extends mill.main.ServerMain[Evaluator.State]{ this, () => System.exit(0), 300000, - mill.client.Locks.files(args0(0)) + mill.main.client.Locks.files(args0(0)) ).run() } def main0(args: Array[String], diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala index e7fd6a79..9e22f614 100644 --- a/main/src/mill/modules/Jvm.scala +++ b/main/src/mill/modules/Jvm.scala @@ -9,7 +9,7 @@ import java.util.jar.{JarEntry, JarFile, JarOutputStream} import ammonite.ops._ import geny.Generator -import mill.client.InputPumper +import mill.main.client.InputPumper import mill.eval.PathRef import mill.util.{Ctx, IO} import mill.util.Loose.Agg diff --git a/main/test/src/mill/main/ClientServerTests.scala b/main/test/src/mill/main/ClientServerTests.scala index 7139c4db..7ed826af 100644 --- a/main/test/src/mill/main/ClientServerTests.scala +++ b/main/test/src/mill/main/ClientServerTests.scala @@ -2,7 +2,7 @@ package mill.main import java.io._ import java.nio.file.Path -import mill.client.{Util, Locks} +import mill.main.client.{Util, Locks} import scala.collection.JavaConverters._ import utest._ @@ -60,7 +60,7 @@ object ClientServerTests extends TestSuite{ (env : Map[String, String], args: Array[String]) = { val (in, out, err) = initStreams() Server.lockBlock(locks.clientLock){ - mill.client.Main.run( + mill.main.client.Main.run( tmpDir.toString, () => spawnEchoServer(tmpDir, locks), locks, |