summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-04-12 21:24:18 -0700
committerLi Haoyi <haoyi.sg@gmail.com>2018-04-12 22:08:03 -0700
commit2bc041237984bf674ea144ad1a14710c3ed2e47c (patch)
tree1ac405a266c03fdf4c406fd6cf8d23ea7ea09a05 /main
parent948b697aa85ccd6062ce1d001703e1c428cfa397 (diff)
downloadmill-2bc041237984bf674ea144ad1a14710c3ed2e47c.tar.gz
mill-2bc041237984bf674ea144ad1a14710c3ed2e47c.tar.bz2
mill-2bc041237984bf674ea144ad1a14710c3ed2e47c.zip
rename modules scalaworker -> scalalib.worker, client -> main.client
Diffstat (limited to 'main')
-rw-r--r--main/client/src/mill/main/client/InputPumper.java37
-rw-r--r--main/client/src/mill/main/client/Lock.java13
-rw-r--r--main/client/src/mill/main/client/Locked.java10
-rw-r--r--main/client/src/mill/main/client/Locks.java89
-rw-r--r--main/client/src/mill/main/client/Main.java223
-rw-r--r--main/client/src/mill/main/client/Util.java95
-rw-r--r--main/client/test/src/mill/main/client/ClientTests.java61
-rw-r--r--main/src/mill/Main.scala2
-rw-r--r--main/src/mill/main/Server.scala4
-rw-r--r--main/src/mill/modules/Jvm.scala2
-rw-r--r--main/test/src/mill/main/ClientServerTests.scala4
11 files changed, 534 insertions, 6 deletions
diff --git a/main/client/src/mill/main/client/InputPumper.java b/main/client/src/mill/main/client/InputPumper.java
new file mode 100644
index 00000000..5205be0b
--- /dev/null
+++ b/main/client/src/mill/main/client/InputPumper.java
@@ -0,0 +1,37 @@
+package mill.main.client;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class InputPumper implements Runnable{
+ private InputStream src;
+ private OutputStream dest;
+ private Boolean checkAvailable;
+ public InputPumper(InputStream src,
+ OutputStream dest,
+ Boolean checkAvailable){
+ this.src = src;
+ this.dest = dest;
+ this.checkAvailable = checkAvailable;
+ }
+
+ boolean running = true;
+ public void run() {
+ byte[] buffer = new byte[1024];
+ try{
+ while(running){
+ if (checkAvailable && src.available() == 0) Thread.sleep(2);
+ else {
+ int n = src.read(buffer);
+ if (n == -1) running = false;
+ else {
+ dest.write(buffer, 0, n);
+ dest.flush();
+ }
+ }
+ }
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/main/client/src/mill/main/client/Lock.java b/main/client/src/mill/main/client/Lock.java
new file mode 100644
index 00000000..890a352b
--- /dev/null
+++ b/main/client/src/mill/main/client/Lock.java
@@ -0,0 +1,13 @@
+package mill.main.client;
+public abstract class Lock{
+ abstract public Locked lock() throws Exception;
+ abstract public Locked tryLock() throws Exception;
+ public void await() throws Exception{
+ lock().release();
+ }
+
+ /**
+ * Returns `true` if the lock is *available for taking*
+ */
+ abstract public boolean probe() throws Exception;
+} \ No newline at end of file
diff --git a/main/client/src/mill/main/client/Locked.java b/main/client/src/mill/main/client/Locked.java
new file mode 100644
index 00000000..e6ad3d63
--- /dev/null
+++ b/main/client/src/mill/main/client/Locked.java
@@ -0,0 +1,10 @@
+package mill.main.client;
+
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public interface Locked{
+ public void release() throws Exception;
+} \ No newline at end of file
diff --git a/main/client/src/mill/main/client/Locks.java b/main/client/src/mill/main/client/Locks.java
new file mode 100644
index 00000000..2843973d
--- /dev/null
+++ b/main/client/src/mill/main/client/Locks.java
@@ -0,0 +1,89 @@
+package mill.main.client;
+
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Locks{
+ public Lock processLock;
+ public Lock serverLock;
+ public Lock clientLock;
+ public static Locks files(String lockBase) throws Exception{
+ return new Locks(){{
+ processLock = new FileLock(lockBase + "/pid");
+
+ serverLock = new FileLock(lockBase + "/serverLock");
+
+ clientLock = new FileLock(lockBase + "/clientLock");
+ }};
+ }
+ public static Locks memory(){
+ return new Locks(){{
+ this.processLock = new MemoryLock();
+ this.serverLock = new MemoryLock();
+ this.clientLock = new MemoryLock();
+ }};
+ }
+}
+class FileLocked implements Locked{
+ private java.nio.channels.FileLock lock;
+ public FileLocked(java.nio.channels.FileLock lock){
+ this.lock = lock;
+ }
+ public void release() throws Exception{
+ this.lock.release();
+ }
+}
+
+class FileLock extends Lock{
+ String path;
+ RandomAccessFile raf;
+ FileChannel chan;
+ public FileLock(String path) throws Exception{
+ this.path = path;
+ raf = new RandomAccessFile(path, "rw");
+ chan = raf.getChannel();
+ }
+
+ public Locked lock() throws Exception{
+ return new FileLocked(chan.lock());
+ }
+ public Locked tryLock() throws Exception{
+ java.nio.channels.FileLock l = chan.tryLock();
+ if (l == null) return null;
+ else return new FileLocked(l);
+ }
+ public boolean probe()throws Exception{
+ java.nio.channels.FileLock l = chan.tryLock();
+ if (l == null) return false;
+ else {
+ l.release();
+ return true;
+ }
+ }
+}
+class MemoryLocked implements Locked{
+ java.util.concurrent.locks.Lock l;
+ public MemoryLocked(java.util.concurrent.locks.Lock l){
+ this.l = l;
+ }
+ public void release() throws Exception{
+ l.unlock();
+ }
+}
+
+class MemoryLock extends Lock{
+ ReentrantLock innerLock = new ReentrantLock(true);
+
+ public boolean probe(){
+ return !innerLock.isLocked();
+ }
+ public Locked lock() {
+ innerLock.lock();
+ return new MemoryLocked(innerLock);
+ }
+ public Locked tryLock() {
+ if (innerLock.tryLock()) return new MemoryLocked(innerLock);
+ else return null;
+ }
+}
diff --git a/main/client/src/mill/main/client/Main.java b/main/client/src/mill/main/client/Main.java
new file mode 100644
index 00000000..98445d3c
--- /dev/null
+++ b/main/client/src/mill/main/client/Main.java
@@ -0,0 +1,223 @@
+package mill.main.client;
+
+import org.scalasbt.ipcsocket.*;
+
+import java.io.*;
+import java.net.Socket;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.channels.FileChannel;
+import java.util.*;
+
+public class Main {
+ static void initServer(String lockBase, boolean setJnaNoSys) throws IOException,URISyntaxException{
+ ArrayList<String> selfJars = new ArrayList<String>();
+ ClassLoader current = Main.class.getClassLoader();
+ while(current != null){
+ if (current instanceof java.net.URLClassLoader) {
+ URL[] urls = ((java.net.URLClassLoader) current).getURLs();
+ for (URL url: urls) {
+ selfJars.add(new File(url.toURI()).getCanonicalPath());
+ }
+ }
+ current = current.getParent();
+ }
+ if (Util.isJava9OrAbove) {
+ selfJars.addAll(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+ }
+ ArrayList<String> l = new java.util.ArrayList<String>();
+ l.add("java");
+ Properties props = System.getProperties();
+ Iterator<String> keys = props.stringPropertyNames().iterator();
+ while(keys.hasNext()){
+ String k = keys.next();
+ if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k));
+ }
+ if (setJnaNoSys) {
+ l.add("-Djna.nosys=true");
+ }
+ l.add("-cp");
+ l.add(String.join(File.pathSeparator, selfJars));
+ l.add("mill.main.ServerMain");
+ l.add(lockBase);
+
+ new java.lang.ProcessBuilder()
+ .command(l)
+ .redirectOutput(new java.io.File(lockBase + "/logs"))
+ .redirectError(new java.io.File(lockBase + "/logs"))
+ .start();
+ }
+ public static void main(String[] args) throws Exception{
+ boolean setJnaNoSys = System.getProperty("jna.nosys") == null;
+ Map<String, String> env = System.getenv();
+ if (setJnaNoSys) {
+ System.setProperty("jna.nosys", "true");
+ }
+ int index = 0;
+ while (index < 5) {
+ index += 1;
+ String lockBase = "out/mill-worker-" + index;
+ new java.io.File(lockBase).mkdirs();
+ RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw");
+ FileChannel channel = lockFile.getChannel();
+ java.nio.channels.FileLock tryLock = channel.tryLock();
+ if (tryLock == null) {
+ lockFile.close();
+ channel.close();
+ } else {
+ int exitCode = Main.run(
+ lockBase,
+ new Runnable() {
+ @Override
+ public void run() {
+ try{
+ initServer(lockBase, setJnaNoSys);
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ Locks.files(lockBase),
+ System.in,
+ System.out,
+ System.err,
+ args,
+ env
+ );
+ System.exit(exitCode);
+ }
+ }
+ throw new Exception("Reached max process limit: " + 5);
+ }
+
+ public static int run(String lockBase,
+ Runnable initServer,
+ Locks locks,
+ InputStream stdin,
+ OutputStream stdout,
+ OutputStream stderr,
+ String[] args,
+ Map<String, String> env) throws Exception{
+
+ FileOutputStream f = new FileOutputStream(lockBase + "/run");
+ f.write(System.console() != null ? 1 : 0);
+ Util.writeString(f, System.getProperty("MILL_VERSION"));
+ Util.writeArgs(args, f);
+ Util.writeMap(env, f);
+ f.close();
+
+ boolean serverInit = false;
+ if (locks.processLock.probe()) {
+ serverInit = true;
+ initServer.run();
+ }
+ while(locks.processLock.probe()) Thread.sleep(3);
+
+ // Need to give sometime for Win32NamedPipeSocket to work
+ // if the server is just initialized
+ if (serverInit && Util.isWindows) Thread.sleep(1000);
+
+ Socket ioSocket = null;
+
+ long retryStart = System.currentTimeMillis();
+ while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){
+ try{
+ ioSocket = Util.isWindows?
+ new Win32NamedPipeSocket(Util.WIN32_PIPE_PREFIX + new File(lockBase).getName())
+ : new UnixDomainSocket(lockBase + "/io");
+ }catch(Throwable e){
+ Thread.sleep(1);
+ }
+ }
+ if (ioSocket == null){
+ throw new Exception("Failed to connect to server");
+ }
+
+ InputStream outErr = ioSocket.getInputStream();
+ OutputStream in = ioSocket.getOutputStream();
+ ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr);
+ InputPumper inPump = new InputPumper(stdin, in, true);
+ Thread outThread = new Thread(outPump);
+ outThread.setDaemon(true);
+ Thread inThread = new Thread(inPump);
+ inThread.setDaemon(true);
+ outThread.start();
+ inThread.start();
+
+ locks.serverLock.await();
+
+ try{
+ return Integer.parseInt(
+ new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(lockBase + "/exitCode")
+ )
+ ).readLine()
+ );
+ } catch(Throwable e){
+ return 1;
+ }
+ }
+}
+
+class ClientOutputPumper implements Runnable{
+ private InputStream src;
+ private OutputStream dest1;
+ private OutputStream dest2;
+ public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){
+ this.src = src;
+ this.dest1 = dest1;
+ this.dest2 = dest2;
+ }
+
+ public void run() {
+ byte[] buffer = new byte[1024];
+ int state = 0;
+ boolean running = true;
+ boolean first = true;
+ while (running) {
+ try {
+ int n = src.read(buffer);
+ first = false;
+ if (n == -1) running = false;
+ else {
+ int i = 0;
+ while (i < n) {
+ switch (state) {
+ case 0:
+ state = buffer[i] + 1;
+ break;
+ case 1:
+ dest1.write(buffer[i]);
+ state = 0;
+ break;
+ case 2:
+ dest2.write(buffer[i]);
+ state = 0;
+ break;
+ }
+
+ i += 1;
+ }
+ dest1.flush();
+ dest2.flush();
+ }
+ } catch (IOException e) {
+ // Win32NamedPipeSocket input stream somehow doesn't return -1,
+ // instead it throws an IOException whose message contains "ReadFile()".
+ // However, if it throws an IOException before ever reading some bytes,
+ // it could not connect to the server, so exit.
+ if (Util.isWindows && e.getMessage().contains("ReadFile()")) {
+ if (first) {
+ System.err.println("Failed to connect to server");
+ System.exit(1);
+ } else running = false;
+ } else {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ }
+ }
+
+}
diff --git a/main/client/src/mill/main/client/Util.java b/main/client/src/mill/main/client/Util.java
new file mode 100644
index 00000000..54361734
--- /dev/null
+++ b/main/client/src/mill/main/client/Util.java
@@ -0,0 +1,95 @@
+package mill.main.client;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Util {
+ public static boolean isWindows = System.getProperty("os.name").toLowerCase().startsWith("windows");
+ public static boolean isJava9OrAbove = !System.getProperty("java.specification.version").startsWith("1.");
+
+ // Windows named pipe prefix (see https://github.com/sbt/ipcsocket/blob/v1.0.0/README.md)
+ // Win32NamedPipeServerSocket automatically adds this as a prefix (if it is not already is prefixed),
+ // but Win32NamedPipeSocket does not
+ // https://github.com/sbt/ipcsocket/blob/v1.0.0/src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeServerSocket.java#L36
+ public static String WIN32_PIPE_PREFIX = "\\\\.\\pipe\\";
+
+ public static String[] parseArgs(InputStream argStream) throws IOException {
+
+ int argsLength = readInt(argStream);
+ String[] args = new String[argsLength];
+ for (int i = 0; i < args.length; i++) {
+ args[i] = readString(argStream);
+ }
+ return args;
+ }
+ public static void writeArgs(String[] args,
+ OutputStream argStream) throws IOException {
+ writeInt(argStream, args.length);
+ for(String arg: args){
+ writeString(argStream, arg);
+ }
+ }
+
+ /**
+ * This allows the mill client to pass the environment as he sees it to the
+ * server (as the server remains alive over the course of several runs and
+ * does not see the environment changes the client would)
+ */
+ public static void writeMap(Map<String, String> map, OutputStream argStream) throws IOException {
+ writeInt(argStream, map.size());
+ for (Map.Entry<String, String> kv : map.entrySet()) {
+ writeString(argStream, kv.getKey());
+ writeString(argStream, kv.getValue());
+ }
+ }
+
+ public static Map<String, String> parseMap(InputStream argStream) throws IOException {
+ Map<String, String> env = new HashMap<>();
+ int mapLength = readInt(argStream);
+ for (int i = 0; i < mapLength; i++) {
+ String key = readString(argStream);
+ String value = readString(argStream);
+ env.put(key, value);
+ }
+ return env;
+ }
+
+ public static String readString(InputStream inputStream) throws IOException {
+ // Result is between 0 and 255, hence the loop.
+ int length = readInt(inputStream);
+ byte[] arr = new byte[length];
+ int total = 0;
+ while(total < length){
+ int res = inputStream.read(arr, total, length-total);
+ if (res == -1) throw new IOException("Incomplete String");
+ else{
+ total += res;
+ }
+ }
+ return new String(arr);
+ }
+
+ public static void writeString(OutputStream outputStream, String string) throws IOException {
+ byte[] bytes = string.getBytes();
+ writeInt(outputStream, bytes.length);
+ outputStream.write(bytes);
+ }
+
+ public static void writeInt(OutputStream out, int i) throws IOException{
+ out.write((byte)(i >>> 24));
+ out.write((byte)(i >>> 16));
+ out.write((byte)(i >>> 8));
+ out.write((byte)i);
+ }
+ public static int readInt(InputStream in) throws IOException{
+ return ((in.read() & 0xFF) << 24) +
+ ((in.read() & 0xFF) << 16) +
+ ((in.read() & 0xFF) << 8) +
+ (in.read() & 0xFF);
+ }
+
+}
diff --git a/main/client/test/src/mill/main/client/ClientTests.java b/main/client/test/src/mill/main/client/ClientTests.java
new file mode 100644
index 00000000..5ae44d3f
--- /dev/null
+++ b/main/client/test/src/mill/main/client/ClientTests.java
@@ -0,0 +1,61 @@
+package mill.main.client;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+public class ClientTests {
+ @Test
+ public void readWriteInt() throws Exception{
+ int[] examples = {
+ 0, 1, 126, 127, 128, 254, 255, 256, 1024, 99999, 1234567,
+ Integer.MAX_VALUE, Integer.MAX_VALUE / 2, Integer.MIN_VALUE
+ };
+ for(int example0: examples){
+ for(int example: new int[]{-example0, example0}){
+ ByteArrayOutputStream o = new ByteArrayOutputStream();
+ Util.writeInt(o, example);
+ ByteArrayInputStream i = new ByteArrayInputStream(o.toByteArray());
+ int s = Util.readInt(i);
+ assertEquals(example, s);
+ assertEquals(i.available(), 0);
+ }
+ }
+ }
+ @Test
+ public void readWriteString() throws Exception{
+ String[] examples = {
+ "",
+ "hello",
+ "i am cow",
+ "i am cow\nhear me moo\ni weight twice as much as you",
+ "我是一个叉烧包",
+ };
+ for(String example: examples){
+ checkStringRoundTrip(example);
+ }
+ }
+
+ @Test
+ public void readWriteBigString() throws Exception{
+ int[] lengths = {0, 1, 126, 127, 128, 254, 255, 256, 1024, 99999, 1234567};
+ for(int i = 0; i < lengths.length; i++){
+ final char[] bigChars = new char[lengths[i]];
+ java.util.Arrays.fill(bigChars, 'X');
+ checkStringRoundTrip(new String(bigChars));
+ }
+ }
+
+ public void checkStringRoundTrip(String example) throws Exception{
+ ByteArrayOutputStream o = new ByteArrayOutputStream();
+ Util.writeString(o, example);
+ ByteArrayInputStream i = new ByteArrayInputStream(o.toByteArray());
+ String s = Util.readString(i);
+ assertEquals(example, s);
+ assertEquals(i.available(), 0);
+ }
+
+
+} \ No newline at end of file
diff --git a/main/src/mill/Main.scala b/main/src/mill/Main.scala
index 2992afa4..5573a325 100644
--- a/main/src/mill/Main.scala
+++ b/main/src/mill/Main.scala
@@ -106,7 +106,7 @@ object Main {
env
)
- if (mill.client.Util.isJava9OrAbove) {
+ if (mill.main.client.Util.isJava9OrAbove) {
val rt = cliConfig.home / Export.rtJarName
if (!exists(rt)) {
runner.printInfo(s"Preparing Java ${System.getProperty("java.version")} runtime; this may take a minute or two ...")
diff --git a/main/src/mill/main/Server.scala b/main/src/mill/main/Server.scala
index 275767c8..07703bed 100644
--- a/main/src/mill/main/Server.scala
+++ b/main/src/mill/main/Server.scala
@@ -6,7 +6,7 @@ import java.net.Socket
import mill.Main
import scala.collection.JavaConverters._
import org.scalasbt.ipcsocket._
-import mill.client._
+import mill.main.client._
import mill.eval.Evaluator
import mill.util.DummyInputStream
@@ -28,7 +28,7 @@ object ServerMain extends mill.main.ServerMain[Evaluator.State]{
this,
() => System.exit(0),
300000,
- mill.client.Locks.files(args0(0))
+ mill.main.client.Locks.files(args0(0))
).run()
}
def main0(args: Array[String],
diff --git a/main/src/mill/modules/Jvm.scala b/main/src/mill/modules/Jvm.scala
index e7fd6a79..9e22f614 100644
--- a/main/src/mill/modules/Jvm.scala
+++ b/main/src/mill/modules/Jvm.scala
@@ -9,7 +9,7 @@ import java.util.jar.{JarEntry, JarFile, JarOutputStream}
import ammonite.ops._
import geny.Generator
-import mill.client.InputPumper
+import mill.main.client.InputPumper
import mill.eval.PathRef
import mill.util.{Ctx, IO}
import mill.util.Loose.Agg
diff --git a/main/test/src/mill/main/ClientServerTests.scala b/main/test/src/mill/main/ClientServerTests.scala
index 7139c4db..7ed826af 100644
--- a/main/test/src/mill/main/ClientServerTests.scala
+++ b/main/test/src/mill/main/ClientServerTests.scala
@@ -2,7 +2,7 @@ package mill.main
import java.io._
import java.nio.file.Path
-import mill.client.{Util, Locks}
+import mill.main.client.{Util, Locks}
import scala.collection.JavaConverters._
import utest._
@@ -60,7 +60,7 @@ object ClientServerTests extends TestSuite{
(env : Map[String, String], args: Array[String]) = {
val (in, out, err) = initStreams()
Server.lockBlock(locks.clientLock){
- mill.client.Main.run(
+ mill.main.client.Main.run(
tmpDir.toString,
() => spawnEchoServer(tmpDir, locks),
locks,