summaryrefslogtreecommitdiff
path: root/clientserver
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-02-26 21:35:32 -0800
committerLi Haoyi <haoyi.sg@gmail.com>2018-02-26 21:52:42 -0800
commitfd8a2567ad32c11bcf8adbaca85bdba72bb4f935 (patch)
tree618d27157adab6508a4a6804e5c857975fcfb8cd /clientserver
parent02e64b943b90387993f8f7bd7e3cd265ee569d27 (diff)
downloadmill-fd8a2567ad32c11bcf8adbaca85bdba72bb4f935.tar.gz
mill-fd8a2567ad32c11bcf8adbaca85bdba72bb4f935.tar.bz2
mill-fd8a2567ad32c11bcf8adbaca85bdba72bb4f935.zip
Port Mill client over to Java
This helps us avoid accidentally depending on the Scala library, whose classloading takes tens to hundreds of milliseconds. This removes the last parts of the Scala library used in the client (e.g. lambdas) and reduces `mill show core.compile` from ~380ms to ~290ms
Diffstat (limited to 'clientserver')
-rw-r--r--clientserver/src/mill/clientserver/Client.java185
-rw-r--r--clientserver/src/mill/clientserver/Client.scala68
-rw-r--r--clientserver/src/mill/clientserver/ClientServer.java33
-rw-r--r--clientserver/src/mill/clientserver/ClientServer.scala155
-rw-r--r--clientserver/src/mill/clientserver/InputPumper.java37
-rw-r--r--clientserver/src/mill/clientserver/Locks.java105
-rw-r--r--clientserver/src/mill/clientserver/Locks.scala103
-rw-r--r--clientserver/src/mill/clientserver/Server.scala75
-rw-r--r--clientserver/test/src/mill/clientserver/ClientServerTests.scala10
9 files changed, 429 insertions, 342 deletions
diff --git a/clientserver/src/mill/clientserver/Client.java b/clientserver/src/mill/clientserver/Client.java
new file mode 100644
index 00000000..1870c8a4
--- /dev/null
+++ b/clientserver/src/mill/clientserver/Client.java
@@ -0,0 +1,185 @@
+package mill.clientserver;
+
+import org.scalasbt.ipcsocket.UnixDomainSocket;
+
+import java.io.*;
+import java.net.URL;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
+
+public class Client {
+ static void initServer(String lockBase) throws IOException{
+ StringBuilder selfJars = new java.lang.StringBuilder();
+ ClassLoader current = Client.class.getClassLoader();
+ while(current != null){
+ if (current instanceof java.net.URLClassLoader) {
+ URL[] urls = ((java.net.URLClassLoader) current).getURLs();
+ for (URL url: urls) {
+ if (selfJars.length() != 0) selfJars.append(':');
+ selfJars.append(url);
+ }
+ }
+ current = current.getParent();
+ }
+
+ ArrayList<String> l = new java.util.ArrayList<String>();
+ l.add("java");
+ Properties props = System.getProperties();
+ Iterator<String> keys = props.stringPropertyNames().iterator();
+ while(keys.hasNext()){
+ String k = keys.next();
+ if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k));
+ }
+ l.add("-cp");
+ l.add(selfJars.toString());
+ l.add("mill.ServerMain");
+ l.add(lockBase);
+ new java.lang.ProcessBuilder()
+ .command(l)
+ .redirectOutput(new java.io.File(lockBase + "/logs"))
+ .redirectError(new java.io.File(lockBase + "/logs"))
+ .start();
+ }
+ public static void main(String[] args) throws Exception{
+ int index = 0;
+ while (index < 5) {
+ index += 1;
+ String lockBase = "out/mill-worker-" + index;
+ new java.io.File(lockBase).mkdirs();
+ RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw");
+ FileChannel channel = lockFile.getChannel();
+ java.nio.channels.FileLock tryLock = channel.tryLock();
+ if (tryLock == null) {
+ lockFile.close();
+ channel.close();
+ } else {
+ int exitCode = Client.run(
+ lockBase,
+ new Runnable() {
+ @Override
+ public void run() {
+ try{
+ initServer(lockBase);
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ Locks.files(lockBase),
+ System.in,
+ System.out,
+ System.err,
+ args
+ );
+ System.exit(exitCode);
+ }
+ }
+ throw new Exception("Reached max process limit: " + 5);
+ }
+
+
+ public static int run(String lockBase,
+ Runnable initServer,
+ Locks locks,
+ InputStream stdin,
+ OutputStream stdout,
+ OutputStream stderr,
+ String[] args) throws Exception{
+
+ FileOutputStream f = new FileOutputStream(lockBase + "/run");
+ ClientServer.writeArgs(System.console() != null, args, f);
+ f.close();
+ if (locks.processLock.probe()) initServer.run();
+ while(locks.processLock.probe()) Thread.sleep(3);
+
+
+ UnixDomainSocket ioSocket = null;
+
+ long retryStart = System.currentTimeMillis();
+ while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){
+ try{
+ ioSocket = new UnixDomainSocket(lockBase + "/io");
+ }catch(Throwable e){
+ Thread.sleep(1);
+ }
+ }
+ if (ioSocket == null){
+ throw new Exception("Failed to connect to server");
+ }
+ InputStream outErr = ioSocket.getInputStream();
+ OutputStream in = ioSocket.getOutputStream();
+ ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr);
+ InputPumper inPump = new InputPumper(stdin, in, true);
+ Thread outThread = new Thread(outPump);
+ outThread.setDaemon(true);
+ Thread inThread = new Thread(inPump);
+ inThread.setDaemon(true);
+ outThread.start();
+ inThread.start();
+
+ locks.serverLock.await();
+
+ try{
+ return Integer.parseInt(
+ new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(lockBase + "/exitCode")
+ )
+ ).readLine()
+ );
+ } catch(Throwable e){
+ return 1;
+ }
+ }
+}
+
+class ClientOutputPumper implements Runnable{
+ private InputStream src;
+ private OutputStream dest1;
+ private OutputStream dest2;
+ public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){
+ this.src = src;
+ this.dest1 = dest1;
+ this.dest2 = dest2;
+ }
+
+ boolean running = true;
+ public void run() {
+ byte[] buffer = new byte[1024];
+ int state = 0;
+ try {
+ while(running){
+
+ int n = src.read(buffer);
+ if (n == -1) running = false;
+ else {
+ int i = 0;
+ while (i < n) {
+ switch (state) {
+ case 0:
+ state = buffer[i] + 1;
+ break;
+ case 1:
+ dest1.write(buffer[i]);
+ state = 0;
+ break;
+ case 2:
+ dest2.write(buffer[i]);
+ state = 0;
+ break;
+ }
+
+ i += 1;
+ }
+ dest1.flush();
+ dest2.flush();
+ }
+ }
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/clientserver/src/mill/clientserver/Client.scala b/clientserver/src/mill/clientserver/Client.scala
deleted file mode 100644
index 14e75b37..00000000
--- a/clientserver/src/mill/clientserver/Client.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-package mill.clientserver
-
-import java.io._
-
-import org.scalasbt.ipcsocket.UnixDomainSocket
-
-object Client{
- def WithLock[T](index: Int)(f: String => T): T = {
- val lockBase = "out/mill-worker-" + index
- new java.io.File(lockBase).mkdirs()
- val lockFile = new RandomAccessFile(lockBase+ "/clientLock", "rw")
- val channel = lockFile.getChannel
- channel.tryLock() match{
- case null =>
- lockFile.close()
- channel.close()
- if (index < 5) WithLock(index + 1)(f)
- else throw new Exception("Reached max process limit: " + 5)
- case locked =>
- try f(lockBase)
- finally{
- locked.release()
- lockFile.close()
- channel.close()
- }
- }
- }
-}
-
-class Client(lockBase: String,
- initServer: () => Unit,
- locks: Locks,
- stdin: InputStream,
- stdout: OutputStream,
- stderr: OutputStream) extends ClientServer(lockBase){
- def run(args: Array[String]): Int = {
- val f = new FileOutputStream(runFile)
- ClientServer.writeArgs(System.console() != null, args, f)
- f.close()
- if (locks.processLock.probe()) initServer()
- while(locks.processLock.probe()) Thread.sleep(3)
-
- val ioSocket = ClientServer.retry(1000, new UnixDomainSocket(ioPath))
-
- val outErr = ioSocket.getInputStream
- val in = ioSocket.getOutputStream
- val outPump = new ClientOutputPumper(outErr, stdout, stderr)
- val inPump = new ClientInputPumper(stdin, in, checkAvailable = true)
- val outThread = new Thread(outPump)
- outThread.setDaemon(true)
- val inThread = new Thread(inPump)
- inThread.setDaemon(true)
- outThread.start()
- inThread.start()
-
- locks.serverLock.await()
-
- try{
- Integer.parseInt(
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(exitCodePath)
- )
- ).readLine()
- )
- } catch{case e: Throwable => 1}
- }
-}
diff --git a/clientserver/src/mill/clientserver/ClientServer.java b/clientserver/src/mill/clientserver/ClientServer.java
new file mode 100644
index 00000000..a8692c61
--- /dev/null
+++ b/clientserver/src/mill/clientserver/ClientServer.java
@@ -0,0 +1,33 @@
+package mill.clientserver;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+class ClientServer {
+ public static String[] parseArgs(InputStream argStream) throws IOException {
+
+ int argsLength = argStream.read();
+ String[] args = new String[argsLength];
+ for (int i = 0; i < args.length; i++) {
+ int n = argStream.read();
+ byte[] arr = new byte[n];
+ argStream.read(arr);
+ args[i] = new String(arr);
+ }
+ return args;
+ }
+ public static void writeArgs(Boolean interactive,
+ String[] args,
+ OutputStream argStream) throws IOException{
+ argStream.write(interactive ? 1 : 0);
+ argStream.write(args.length);
+ int i = 0;
+ while (i < args.length){
+ argStream.write(args[i].length());
+ argStream.write(args[i].getBytes());
+ i += 1;
+ }
+ }
+} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/ClientServer.scala b/clientserver/src/mill/clientserver/ClientServer.scala
deleted file mode 100644
index 9ca3a138..00000000
--- a/clientserver/src/mill/clientserver/ClientServer.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-package mill.clientserver
-
-import java.io.{FileInputStream, InputStream, OutputStream, RandomAccessFile}
-import java.nio.channels.FileChannel
-
-import scala.annotation.tailrec
-
-class ClientServer(lockBase: String){
- val ioPath = lockBase + "/io"
- val exitCodePath = lockBase + "/exitCode"
- val logFile = new java.io.File(lockBase + "/log")
- val runFile = new java.io.File(lockBase + "/run")
-}
-
-object ClientServer{
- def parseArgs(argStream: InputStream) = {
- val interactive = argStream.read() != 0
- val argsLength = argStream.read()
- val args = Array.fill(argsLength){
- val n = argStream.read()
- val arr = new Array[Byte](n)
- argStream.read(arr)
- new String(arr)
- }
- (interactive, args)
- }
- def writeArgs(interactive: Boolean, args: Array[String], argStream: OutputStream) = {
- argStream.write(if (interactive) 1 else 0)
- argStream.write(args.length)
- var i = 0
- while (i < args.length){
- argStream.write(args(i).length)
- argStream.write(args(i).getBytes)
- i += 1
- }
- }
- @tailrec def retry[T](millis: Long, t: => T): T = {
- val current = System.currentTimeMillis()
- val res =
- try Some(t)
- catch{case e: Throwable if System.currentTimeMillis() < current + millis =>
- None
- }
- res match{
- case Some(t) => t
- case None =>
- Thread.sleep(1)
- retry(millis - (System.currentTimeMillis() - current), t)
- }
- }
-
- def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
- @volatile var interrupt = true
- @volatile var interrupted = false
- new Thread(() => {
- Thread.sleep(millis)
- if (interrupt) {
- interrupted = true
- close
- }
- }).start()
-
- try {
- val res =
- try Some(t)
- catch {case e: Throwable => None}
-
- if (interrupted) None
- else res
-
- } finally {
- interrupt = false
- }
- }
-
- def polling[T](probe: => Boolean, cb: () => Unit)(t: => T): T = {
- var probing = true
- val probeThread = new Thread(() => while(probing){
- if (probe){
- probing = false
- cb()
- }
- Thread.sleep(1000)
- })
- probeThread.start()
- try t
- finally probing = false
- }
-}
-object ProxyOutputStream{
- val lock = new Object
-}
-class ProxyOutputStream(x: => java.io.OutputStream,
- key: Int) extends java.io.OutputStream {
- override def write(b: Int) = ProxyOutputStream.lock.synchronized{
- x.write(key)
- x.write(b)
- }
-}
-class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
- def read() = x.read()
- override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
- override def read(b: Array[Byte]) = x.read(b)
-}
-
-class ClientInputPumper(src: InputStream,
- dest: OutputStream,
- checkAvailable: Boolean = false) extends Runnable{
- var running = true
- def run() = {
- val buffer = new Array[Byte](1024)
- while(running){
- if (checkAvailable && src.available() == 0) Thread.sleep(2)
- else {
- val n = src.read(buffer)
- if (n == -1) running = false
- else {
- dest.write(buffer, 0, n)
- dest.flush()
- }
- }
- }
- }
-
-}
-class ClientOutputPumper(src: InputStream, dest1: OutputStream, dest2: OutputStream) extends Runnable{
- var running = true
- def run() = {
- val buffer = new Array[Byte](1024)
- var state = 0
- while(running){
- val n = src.read(buffer)
- if (n == -1) running = false
- else {
- var i = 0
- while (i < n){
- state match{
- case 0 => state = buffer(i) + 1
- case 1 =>
- dest1.write(buffer(i))
- state = 0
- case 2 =>
- dest2.write(buffer(i))
- state = 0
- }
-
- i += 1
- }
- dest1.flush()
- dest2.flush()
- }
- }
- }
-
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/InputPumper.java b/clientserver/src/mill/clientserver/InputPumper.java
new file mode 100644
index 00000000..9f4bfb82
--- /dev/null
+++ b/clientserver/src/mill/clientserver/InputPumper.java
@@ -0,0 +1,37 @@
+package mill.clientserver;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class InputPumper implements Runnable{
+ private InputStream src;
+ private OutputStream dest;
+ private Boolean checkAvailable;
+ public InputPumper(InputStream src,
+ OutputStream dest,
+ Boolean checkAvailable){
+ this.src = src;
+ this.dest = dest;
+ this.checkAvailable = checkAvailable;
+ }
+
+ boolean running = true;
+ public void run() {
+ byte[] buffer = new byte[1024];
+ try{
+ while(running){
+ if (checkAvailable && src.available() == 0) Thread.sleep(2);
+ else {
+ int n = src.read(buffer);
+ if (n == -1) running = false;
+ else {
+ dest.write(buffer, 0, n);
+ dest.flush();
+ }
+ }
+ }
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/Locks.java b/clientserver/src/mill/clientserver/Locks.java
new file mode 100644
index 00000000..78c8dc6e
--- /dev/null
+++ b/clientserver/src/mill/clientserver/Locks.java
@@ -0,0 +1,105 @@
+package mill.clientserver;
+
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+abstract class Lock{
+ abstract public Locked lock() throws Exception;
+ abstract public Locked tryLock() throws Exception;
+ public void await() throws Exception{
+ lock().release();
+ }
+
+ /**
+ * Returns `true` if the lock is *available for taking*
+ */
+ abstract public boolean probe() throws Exception;
+}
+interface Locked{
+ void release() throws Exception;
+}
+class Locks{
+ public Lock processLock;
+ public Lock serverLock;
+ public Lock clientLock;
+ static Locks files(String lockBase) throws Exception{
+ return new Locks(){{
+ processLock = new FileLock(lockBase + "/pid");
+
+ serverLock = new FileLock(lockBase + "/serverLock");
+
+ clientLock = new FileLock(lockBase + "/clientLock");
+ }};
+ }
+ static Locks memory(){
+ return new Locks(){{
+ this.processLock = new MemoryLock();
+ this.serverLock = new MemoryLock();
+ this.clientLock = new MemoryLock();
+ }};
+ }
+}
+class FileLocked implements Locked{
+ private java.nio.channels.FileLock lock;
+ public FileLocked(java.nio.channels.FileLock lock){
+ this.lock = lock;
+ }
+ public void release() throws Exception{
+ this.lock.release();
+ }
+}
+
+class FileLock extends Lock{
+ String path;
+ RandomAccessFile raf;
+ FileChannel chan;
+ public FileLock(String path) throws Exception{
+ this.path = path;
+ raf = new RandomAccessFile(path, "rw");
+ chan = raf.getChannel();
+ }
+
+ public Locked lock() throws Exception{
+ return new FileLocked(chan.lock());
+ }
+ public Locked tryLock() throws Exception{
+ java.nio.channels.FileLock l = chan.tryLock();
+ if (l == null) return null;
+ else return new FileLocked(l);
+ }
+ public boolean probe()throws Exception{
+ java.nio.channels.FileLock l = chan.tryLock();
+ if (l == null) return false;
+ else {
+ l.release();
+ return true;
+ }
+ }
+}
+class MemoryLocked implements Locked{
+ java.util.concurrent.locks.Lock l;
+ public MemoryLocked(java.util.concurrent.locks.Lock l){
+ this.l = l;
+ }
+ public void release() throws Exception{
+ l.unlock();
+ }
+}
+
+class MemoryLock extends Lock{
+ ReentrantLock innerLock = new ReentrantLock(true);
+
+ public boolean probe(){
+ return !innerLock.isLocked();
+ }
+ public Locked lock() {
+ innerLock.lock();
+ return new MemoryLocked(innerLock);
+ }
+ public Locked tryLock() {
+ if (innerLock.tryLock()) return new MemoryLocked(innerLock);
+ else return null;
+ }
+}
diff --git a/clientserver/src/mill/clientserver/Locks.scala b/clientserver/src/mill/clientserver/Locks.scala
deleted file mode 100644
index d1644719..00000000
--- a/clientserver/src/mill/clientserver/Locks.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-package mill.clientserver
-
-import java.io.RandomAccessFile
-import java.nio.channels.FileChannel
-import java.util.concurrent.locks.{ReadWriteLock, ReentrantLock}
-
-
-trait Lock{
- def lock(): Locked
- def lockBlock[T](t: => T): T = {
- val l = lock()
- try t
- finally l.release()
- }
- def tryLockBlock[T](t: => T): Option[T] = {
- tryLock() match{
- case None =>
- None
- case Some(l) =>
- try Some(t)
- finally l.release()
- }
-
- }
- def tryLock(): Option[Locked]
- def await(): Unit = {
- val l = lock()
- l.release()
- }
-
- /**
- * Returns `true` if the lock is *available for taking*
- */
- def probe(): Boolean
-}
-trait Locked{
- def release(): Unit
-}
-trait Locks{
- val processLock: Lock
- val serverLock: Lock
- val clientLock: Lock
-}
-class FileLocked(lock: java.nio.channels.FileLock) extends Locked{
- def release() = {
- lock.release()
- }
-}
-
-class FileLock(path: String) extends Lock{
-
- val raf = new RandomAccessFile(path, "rw")
- val chan = raf.getChannel
- def lock() = {
- val lock = chan.lock()
- new FileLocked(lock)
- }
- def tryLock() = {
- chan.tryLock() match{
- case null => None
- case lock => Some(new FileLocked(lock))
- }
- }
- def probe(): Boolean = tryLock() match{
- case None => false
- case Some(locked) =>
- locked.release()
- true
- }
-}
-class FileLocks(lockBase: String) extends Locks{
- val processLock = new FileLock(lockBase + "/pid")
-
- val serverLock = new FileLock(lockBase + "/serverLock")
-
- val clientLock = new FileLock(lockBase + "/clientLock")
-}
-class MemoryLocked(l: java.util.concurrent.locks.Lock) extends Locked{
- def release() = l.unlock()
-}
-
-class MemoryLock() extends Lock{
- val innerLock = new ReentrantLock(true)
-
- def probe() = !innerLock.isLocked
- def lock() = {
- innerLock.lock()
- new MemoryLocked(innerLock)
- }
- def tryLock() = {
- innerLock.tryLock() match{
- case false => None
- case true => Some(new MemoryLocked(innerLock))
- }
- }
-}
-class MemoryLocks() extends Locks{
- val processLock = new MemoryLock()
-
- val serverLock = new MemoryLock()
-
- val clientLock = new MemoryLock()
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala
index 04ddbf0c..27c43302 100644
--- a/clientserver/src/mill/clientserver/Server.scala
+++ b/clientserver/src/mill/clientserver/Server.scala
@@ -12,7 +12,7 @@ trait ServerMain[T]{
this,
() => System.exit(0),
60000,
- new FileLocks(args0(0))
+ Locks.files(args0(0))
).run()
}
var stateCache = Option.empty[T]
@@ -29,19 +29,19 @@ class Server[T](lockBase: String,
sm: ServerMain[T],
interruptServer: () => Unit,
acceptTimeout: Int,
- locks: Locks) extends ClientServer(lockBase){
+ locks: Locks) {
val originalStdout = System.out
def run() = {
- locks.processLock.tryLockBlock{
+ Server.tryLockBlock(locks.processLock){
var running = true
while (running) {
- locks.serverLock.lockBlock{
- new File(ioPath).delete()
- val ioSocket = new UnixDomainServerSocket(ioPath)
- val sockOpt = ClientServer.interruptWith(
+ Server.lockBlock(locks.serverLock){
+ new File(lockBase + "/io").delete()
+ val ioSocket = new UnixDomainServerSocket(lockBase + "/io")
+ val sockOpt = Server.interruptWith(
acceptTimeout,
- new UnixDomainSocket(ioPath).close(),
+ new UnixDomainSocket(lockBase + "/io").close(),
ioSocket.accept()
)
@@ -63,8 +63,9 @@ class Server[T](lockBase: String,
val currentOutErr = clientSocket.getOutputStream
val socketIn = clientSocket.getInputStream
- val argStream = new FileInputStream(runFile)
- val (interactive, args) = ClientServer.parseArgs(argStream)
+ val argStream = new FileInputStream(lockBase + "/run")
+ val interactive = argStream.read() != 0;
+ val args = ClientServer.parseArgs(argStream)
argStream.close()
var done = false
@@ -83,7 +84,7 @@ class Server[T](lockBase: String,
sm.stateCache = newStateCache
java.nio.file.Files.write(
- java.nio.file.Paths.get(exitCodePath),
+ java.nio.file.Paths.get(lockBase + "/exitCode"),
(if (result) 0 else 1).toString.getBytes
)
} catch{case WatchInterrupted(sc: Option[T]) =>
@@ -108,4 +109,56 @@ class Server[T](lockBase: String,
clientSocket.close()
}
}
+object Server{
+ def lockBlock[T](lock: Lock)(t: => T): T = {
+ val l = lock.lock()
+ try t
+ finally l.release()
+ }
+ def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
+ lock.tryLock() match{
+ case null => None
+ case l =>
+ try Some(t)
+ finally l.release()
+ }
+
+ }
+ def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
+ @volatile var interrupt = true
+ @volatile var interrupted = false
+ new Thread(() => {
+ Thread.sleep(millis)
+ if (interrupt) {
+ interrupted = true
+ close
+ }
+ }).start()
+
+ try {
+ val res =
+ try Some(t)
+ catch {case e: Throwable => None}
+
+ if (interrupted) None
+ else res
+
+ } finally {
+ interrupt = false
+ }
+ }
+}
+
+class ProxyOutputStream(x: => java.io.OutputStream,
+ key: Int) extends java.io.OutputStream {
+ override def write(b: Int) = x.synchronized{
+ x.write(key)
+ x.write(b)
+ }
+}
+class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
+ def read() = x.read()
+ override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
+ override def read(b: Array[Byte]) = x.read(b)
+}
case class WatchInterrupted[T](stateCache: Option[T]) extends Exception \ No newline at end of file
diff --git a/clientserver/test/src/mill/clientserver/ClientServerTests.scala b/clientserver/test/src/mill/clientserver/ClientServerTests.scala
index ed9a49d9..2c9a57b0 100644
--- a/clientserver/test/src/mill/clientserver/ClientServerTests.scala
+++ b/clientserver/test/src/mill/clientserver/ClientServerTests.scala
@@ -30,7 +30,7 @@ object ClientServerTests extends TestSuite{
}
def init() = {
val tmpDir = java.nio.file.Files.createTempDirectory("")
- val locks = new MemoryLocks()
+ val locks = Locks.memory()
(tmpDir, locks)
}
@@ -52,16 +52,16 @@ object ClientServerTests extends TestSuite{
def runClient(arg: String) = {
val (in, out, err) = initStreams()
- locks.clientLock.lockBlock{
- val c = new Client(
+ Server.lockBlock(locks.clientLock){
+ Client.run(
tmpDir.toString,
() => spawnEchoServer(),
locks,
in,
out,
- err
+ err,
+ Array(arg)
)
- c.run(Array(arg))
Thread.sleep(100)
(new String(out.toByteArray), new String(err.toByteArray))
}