summaryrefslogtreecommitdiff
path: root/clientserver/src
diff options
context:
space:
mode:
Diffstat (limited to 'clientserver/src')
-rw-r--r--clientserver/src/mill/clientserver/Client.java185
-rw-r--r--clientserver/src/mill/clientserver/Client.scala68
-rw-r--r--clientserver/src/mill/clientserver/ClientServer.java33
-rw-r--r--clientserver/src/mill/clientserver/ClientServer.scala155
-rw-r--r--clientserver/src/mill/clientserver/InputPumper.java37
-rw-r--r--clientserver/src/mill/clientserver/Locks.java105
-rw-r--r--clientserver/src/mill/clientserver/Locks.scala103
-rw-r--r--clientserver/src/mill/clientserver/Server.scala75
8 files changed, 424 insertions, 337 deletions
diff --git a/clientserver/src/mill/clientserver/Client.java b/clientserver/src/mill/clientserver/Client.java
new file mode 100644
index 00000000..1870c8a4
--- /dev/null
+++ b/clientserver/src/mill/clientserver/Client.java
@@ -0,0 +1,185 @@
+package mill.clientserver;
+
+import org.scalasbt.ipcsocket.UnixDomainSocket;
+
+import java.io.*;
+import java.net.URL;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
+
+public class Client {
+ static void initServer(String lockBase) throws IOException{
+ StringBuilder selfJars = new java.lang.StringBuilder();
+ ClassLoader current = Client.class.getClassLoader();
+ while(current != null){
+ if (current instanceof java.net.URLClassLoader) {
+ URL[] urls = ((java.net.URLClassLoader) current).getURLs();
+ for (URL url: urls) {
+ if (selfJars.length() != 0) selfJars.append(':');
+ selfJars.append(url);
+ }
+ }
+ current = current.getParent();
+ }
+
+ ArrayList<String> l = new java.util.ArrayList<String>();
+ l.add("java");
+ Properties props = System.getProperties();
+ Iterator<String> keys = props.stringPropertyNames().iterator();
+ while(keys.hasNext()){
+ String k = keys.next();
+ if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k));
+ }
+ l.add("-cp");
+ l.add(selfJars.toString());
+ l.add("mill.ServerMain");
+ l.add(lockBase);
+ new java.lang.ProcessBuilder()
+ .command(l)
+ .redirectOutput(new java.io.File(lockBase + "/logs"))
+ .redirectError(new java.io.File(lockBase + "/logs"))
+ .start();
+ }
+ public static void main(String[] args) throws Exception{
+ int index = 0;
+ while (index < 5) {
+ index += 1;
+ String lockBase = "out/mill-worker-" + index;
+ new java.io.File(lockBase).mkdirs();
+ RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw");
+ FileChannel channel = lockFile.getChannel();
+ java.nio.channels.FileLock tryLock = channel.tryLock();
+ if (tryLock == null) {
+ lockFile.close();
+ channel.close();
+ } else {
+ int exitCode = Client.run(
+ lockBase,
+ new Runnable() {
+ @Override
+ public void run() {
+ try{
+ initServer(lockBase);
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ Locks.files(lockBase),
+ System.in,
+ System.out,
+ System.err,
+ args
+ );
+ System.exit(exitCode);
+ }
+ }
+ throw new Exception("Reached max process limit: " + 5);
+ }
+
+
+ public static int run(String lockBase,
+ Runnable initServer,
+ Locks locks,
+ InputStream stdin,
+ OutputStream stdout,
+ OutputStream stderr,
+ String[] args) throws Exception{
+
+ FileOutputStream f = new FileOutputStream(lockBase + "/run");
+ ClientServer.writeArgs(System.console() != null, args, f);
+ f.close();
+ if (locks.processLock.probe()) initServer.run();
+ while(locks.processLock.probe()) Thread.sleep(3);
+
+
+ UnixDomainSocket ioSocket = null;
+
+ long retryStart = System.currentTimeMillis();
+ while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){
+ try{
+ ioSocket = new UnixDomainSocket(lockBase + "/io");
+ }catch(Throwable e){
+ Thread.sleep(1);
+ }
+ }
+ if (ioSocket == null){
+ throw new Exception("Failed to connect to server");
+ }
+ InputStream outErr = ioSocket.getInputStream();
+ OutputStream in = ioSocket.getOutputStream();
+ ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr);
+ InputPumper inPump = new InputPumper(stdin, in, true);
+ Thread outThread = new Thread(outPump);
+ outThread.setDaemon(true);
+ Thread inThread = new Thread(inPump);
+ inThread.setDaemon(true);
+ outThread.start();
+ inThread.start();
+
+ locks.serverLock.await();
+
+ try{
+ return Integer.parseInt(
+ new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(lockBase + "/exitCode")
+ )
+ ).readLine()
+ );
+ } catch(Throwable e){
+ return 1;
+ }
+ }
+}
+
+class ClientOutputPumper implements Runnable{
+ private InputStream src;
+ private OutputStream dest1;
+ private OutputStream dest2;
+ public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){
+ this.src = src;
+ this.dest1 = dest1;
+ this.dest2 = dest2;
+ }
+
+ boolean running = true;
+ public void run() {
+ byte[] buffer = new byte[1024];
+ int state = 0;
+ try {
+ while(running){
+
+ int n = src.read(buffer);
+ if (n == -1) running = false;
+ else {
+ int i = 0;
+ while (i < n) {
+ switch (state) {
+ case 0:
+ state = buffer[i] + 1;
+ break;
+ case 1:
+ dest1.write(buffer[i]);
+ state = 0;
+ break;
+ case 2:
+ dest2.write(buffer[i]);
+ state = 0;
+ break;
+ }
+
+ i += 1;
+ }
+ dest1.flush();
+ dest2.flush();
+ }
+ }
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/clientserver/src/mill/clientserver/Client.scala b/clientserver/src/mill/clientserver/Client.scala
deleted file mode 100644
index 14e75b37..00000000
--- a/clientserver/src/mill/clientserver/Client.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-package mill.clientserver
-
-import java.io._
-
-import org.scalasbt.ipcsocket.UnixDomainSocket
-
-object Client{
- def WithLock[T](index: Int)(f: String => T): T = {
- val lockBase = "out/mill-worker-" + index
- new java.io.File(lockBase).mkdirs()
- val lockFile = new RandomAccessFile(lockBase+ "/clientLock", "rw")
- val channel = lockFile.getChannel
- channel.tryLock() match{
- case null =>
- lockFile.close()
- channel.close()
- if (index < 5) WithLock(index + 1)(f)
- else throw new Exception("Reached max process limit: " + 5)
- case locked =>
- try f(lockBase)
- finally{
- locked.release()
- lockFile.close()
- channel.close()
- }
- }
- }
-}
-
-class Client(lockBase: String,
- initServer: () => Unit,
- locks: Locks,
- stdin: InputStream,
- stdout: OutputStream,
- stderr: OutputStream) extends ClientServer(lockBase){
- def run(args: Array[String]): Int = {
- val f = new FileOutputStream(runFile)
- ClientServer.writeArgs(System.console() != null, args, f)
- f.close()
- if (locks.processLock.probe()) initServer()
- while(locks.processLock.probe()) Thread.sleep(3)
-
- val ioSocket = ClientServer.retry(1000, new UnixDomainSocket(ioPath))
-
- val outErr = ioSocket.getInputStream
- val in = ioSocket.getOutputStream
- val outPump = new ClientOutputPumper(outErr, stdout, stderr)
- val inPump = new ClientInputPumper(stdin, in, checkAvailable = true)
- val outThread = new Thread(outPump)
- outThread.setDaemon(true)
- val inThread = new Thread(inPump)
- inThread.setDaemon(true)
- outThread.start()
- inThread.start()
-
- locks.serverLock.await()
-
- try{
- Integer.parseInt(
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(exitCodePath)
- )
- ).readLine()
- )
- } catch{case e: Throwable => 1}
- }
-}
diff --git a/clientserver/src/mill/clientserver/ClientServer.java b/clientserver/src/mill/clientserver/ClientServer.java
new file mode 100644
index 00000000..a8692c61
--- /dev/null
+++ b/clientserver/src/mill/clientserver/ClientServer.java
@@ -0,0 +1,33 @@
+package mill.clientserver;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+class ClientServer {
+ public static String[] parseArgs(InputStream argStream) throws IOException {
+
+ int argsLength = argStream.read();
+ String[] args = new String[argsLength];
+ for (int i = 0; i < args.length; i++) {
+ int n = argStream.read();
+ byte[] arr = new byte[n];
+ argStream.read(arr);
+ args[i] = new String(arr);
+ }
+ return args;
+ }
+ public static void writeArgs(Boolean interactive,
+ String[] args,
+ OutputStream argStream) throws IOException{
+ argStream.write(interactive ? 1 : 0);
+ argStream.write(args.length);
+ int i = 0;
+ while (i < args.length){
+ argStream.write(args[i].length());
+ argStream.write(args[i].getBytes());
+ i += 1;
+ }
+ }
+} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/ClientServer.scala b/clientserver/src/mill/clientserver/ClientServer.scala
deleted file mode 100644
index 9ca3a138..00000000
--- a/clientserver/src/mill/clientserver/ClientServer.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-package mill.clientserver
-
-import java.io.{FileInputStream, InputStream, OutputStream, RandomAccessFile}
-import java.nio.channels.FileChannel
-
-import scala.annotation.tailrec
-
-class ClientServer(lockBase: String){
- val ioPath = lockBase + "/io"
- val exitCodePath = lockBase + "/exitCode"
- val logFile = new java.io.File(lockBase + "/log")
- val runFile = new java.io.File(lockBase + "/run")
-}
-
-object ClientServer{
- def parseArgs(argStream: InputStream) = {
- val interactive = argStream.read() != 0
- val argsLength = argStream.read()
- val args = Array.fill(argsLength){
- val n = argStream.read()
- val arr = new Array[Byte](n)
- argStream.read(arr)
- new String(arr)
- }
- (interactive, args)
- }
- def writeArgs(interactive: Boolean, args: Array[String], argStream: OutputStream) = {
- argStream.write(if (interactive) 1 else 0)
- argStream.write(args.length)
- var i = 0
- while (i < args.length){
- argStream.write(args(i).length)
- argStream.write(args(i).getBytes)
- i += 1
- }
- }
- @tailrec def retry[T](millis: Long, t: => T): T = {
- val current = System.currentTimeMillis()
- val res =
- try Some(t)
- catch{case e: Throwable if System.currentTimeMillis() < current + millis =>
- None
- }
- res match{
- case Some(t) => t
- case None =>
- Thread.sleep(1)
- retry(millis - (System.currentTimeMillis() - current), t)
- }
- }
-
- def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
- @volatile var interrupt = true
- @volatile var interrupted = false
- new Thread(() => {
- Thread.sleep(millis)
- if (interrupt) {
- interrupted = true
- close
- }
- }).start()
-
- try {
- val res =
- try Some(t)
- catch {case e: Throwable => None}
-
- if (interrupted) None
- else res
-
- } finally {
- interrupt = false
- }
- }
-
- def polling[T](probe: => Boolean, cb: () => Unit)(t: => T): T = {
- var probing = true
- val probeThread = new Thread(() => while(probing){
- if (probe){
- probing = false
- cb()
- }
- Thread.sleep(1000)
- })
- probeThread.start()
- try t
- finally probing = false
- }
-}
-object ProxyOutputStream{
- val lock = new Object
-}
-class ProxyOutputStream(x: => java.io.OutputStream,
- key: Int) extends java.io.OutputStream {
- override def write(b: Int) = ProxyOutputStream.lock.synchronized{
- x.write(key)
- x.write(b)
- }
-}
-class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
- def read() = x.read()
- override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
- override def read(b: Array[Byte]) = x.read(b)
-}
-
-class ClientInputPumper(src: InputStream,
- dest: OutputStream,
- checkAvailable: Boolean = false) extends Runnable{
- var running = true
- def run() = {
- val buffer = new Array[Byte](1024)
- while(running){
- if (checkAvailable && src.available() == 0) Thread.sleep(2)
- else {
- val n = src.read(buffer)
- if (n == -1) running = false
- else {
- dest.write(buffer, 0, n)
- dest.flush()
- }
- }
- }
- }
-
-}
-class ClientOutputPumper(src: InputStream, dest1: OutputStream, dest2: OutputStream) extends Runnable{
- var running = true
- def run() = {
- val buffer = new Array[Byte](1024)
- var state = 0
- while(running){
- val n = src.read(buffer)
- if (n == -1) running = false
- else {
- var i = 0
- while (i < n){
- state match{
- case 0 => state = buffer(i) + 1
- case 1 =>
- dest1.write(buffer(i))
- state = 0
- case 2 =>
- dest2.write(buffer(i))
- state = 0
- }
-
- i += 1
- }
- dest1.flush()
- dest2.flush()
- }
- }
- }
-
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/InputPumper.java b/clientserver/src/mill/clientserver/InputPumper.java
new file mode 100644
index 00000000..9f4bfb82
--- /dev/null
+++ b/clientserver/src/mill/clientserver/InputPumper.java
@@ -0,0 +1,37 @@
+package mill.clientserver;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class InputPumper implements Runnable{
+ private InputStream src;
+ private OutputStream dest;
+ private Boolean checkAvailable;
+ public InputPumper(InputStream src,
+ OutputStream dest,
+ Boolean checkAvailable){
+ this.src = src;
+ this.dest = dest;
+ this.checkAvailable = checkAvailable;
+ }
+
+ boolean running = true;
+ public void run() {
+ byte[] buffer = new byte[1024];
+ try{
+ while(running){
+ if (checkAvailable && src.available() == 0) Thread.sleep(2);
+ else {
+ int n = src.read(buffer);
+ if (n == -1) running = false;
+ else {
+ dest.write(buffer, 0, n);
+ dest.flush();
+ }
+ }
+ }
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/Locks.java b/clientserver/src/mill/clientserver/Locks.java
new file mode 100644
index 00000000..78c8dc6e
--- /dev/null
+++ b/clientserver/src/mill/clientserver/Locks.java
@@ -0,0 +1,105 @@
+package mill.clientserver;
+
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+abstract class Lock{
+ abstract public Locked lock() throws Exception;
+ abstract public Locked tryLock() throws Exception;
+ public void await() throws Exception{
+ lock().release();
+ }
+
+ /**
+ * Returns `true` if the lock is *available for taking*
+ */
+ abstract public boolean probe() throws Exception;
+}
+interface Locked{
+ void release() throws Exception;
+}
+class Locks{
+ public Lock processLock;
+ public Lock serverLock;
+ public Lock clientLock;
+ static Locks files(String lockBase) throws Exception{
+ return new Locks(){{
+ processLock = new FileLock(lockBase + "/pid");
+
+ serverLock = new FileLock(lockBase + "/serverLock");
+
+ clientLock = new FileLock(lockBase + "/clientLock");
+ }};
+ }
+ static Locks memory(){
+ return new Locks(){{
+ this.processLock = new MemoryLock();
+ this.serverLock = new MemoryLock();
+ this.clientLock = new MemoryLock();
+ }};
+ }
+}
+class FileLocked implements Locked{
+ private java.nio.channels.FileLock lock;
+ public FileLocked(java.nio.channels.FileLock lock){
+ this.lock = lock;
+ }
+ public void release() throws Exception{
+ this.lock.release();
+ }
+}
+
+class FileLock extends Lock{
+ String path;
+ RandomAccessFile raf;
+ FileChannel chan;
+ public FileLock(String path) throws Exception{
+ this.path = path;
+ raf = new RandomAccessFile(path, "rw");
+ chan = raf.getChannel();
+ }
+
+ public Locked lock() throws Exception{
+ return new FileLocked(chan.lock());
+ }
+ public Locked tryLock() throws Exception{
+ java.nio.channels.FileLock l = chan.tryLock();
+ if (l == null) return null;
+ else return new FileLocked(l);
+ }
+ public boolean probe()throws Exception{
+ java.nio.channels.FileLock l = chan.tryLock();
+ if (l == null) return false;
+ else {
+ l.release();
+ return true;
+ }
+ }
+}
+class MemoryLocked implements Locked{
+ java.util.concurrent.locks.Lock l;
+ public MemoryLocked(java.util.concurrent.locks.Lock l){
+ this.l = l;
+ }
+ public void release() throws Exception{
+ l.unlock();
+ }
+}
+
+class MemoryLock extends Lock{
+ ReentrantLock innerLock = new ReentrantLock(true);
+
+ public boolean probe(){
+ return !innerLock.isLocked();
+ }
+ public Locked lock() {
+ innerLock.lock();
+ return new MemoryLocked(innerLock);
+ }
+ public Locked tryLock() {
+ if (innerLock.tryLock()) return new MemoryLocked(innerLock);
+ else return null;
+ }
+}
diff --git a/clientserver/src/mill/clientserver/Locks.scala b/clientserver/src/mill/clientserver/Locks.scala
deleted file mode 100644
index d1644719..00000000
--- a/clientserver/src/mill/clientserver/Locks.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-package mill.clientserver
-
-import java.io.RandomAccessFile
-import java.nio.channels.FileChannel
-import java.util.concurrent.locks.{ReadWriteLock, ReentrantLock}
-
-
-trait Lock{
- def lock(): Locked
- def lockBlock[T](t: => T): T = {
- val l = lock()
- try t
- finally l.release()
- }
- def tryLockBlock[T](t: => T): Option[T] = {
- tryLock() match{
- case None =>
- None
- case Some(l) =>
- try Some(t)
- finally l.release()
- }
-
- }
- def tryLock(): Option[Locked]
- def await(): Unit = {
- val l = lock()
- l.release()
- }
-
- /**
- * Returns `true` if the lock is *available for taking*
- */
- def probe(): Boolean
-}
-trait Locked{
- def release(): Unit
-}
-trait Locks{
- val processLock: Lock
- val serverLock: Lock
- val clientLock: Lock
-}
-class FileLocked(lock: java.nio.channels.FileLock) extends Locked{
- def release() = {
- lock.release()
- }
-}
-
-class FileLock(path: String) extends Lock{
-
- val raf = new RandomAccessFile(path, "rw")
- val chan = raf.getChannel
- def lock() = {
- val lock = chan.lock()
- new FileLocked(lock)
- }
- def tryLock() = {
- chan.tryLock() match{
- case null => None
- case lock => Some(new FileLocked(lock))
- }
- }
- def probe(): Boolean = tryLock() match{
- case None => false
- case Some(locked) =>
- locked.release()
- true
- }
-}
-class FileLocks(lockBase: String) extends Locks{
- val processLock = new FileLock(lockBase + "/pid")
-
- val serverLock = new FileLock(lockBase + "/serverLock")
-
- val clientLock = new FileLock(lockBase + "/clientLock")
-}
-class MemoryLocked(l: java.util.concurrent.locks.Lock) extends Locked{
- def release() = l.unlock()
-}
-
-class MemoryLock() extends Lock{
- val innerLock = new ReentrantLock(true)
-
- def probe() = !innerLock.isLocked
- def lock() = {
- innerLock.lock()
- new MemoryLocked(innerLock)
- }
- def tryLock() = {
- innerLock.tryLock() match{
- case false => None
- case true => Some(new MemoryLocked(innerLock))
- }
- }
-}
-class MemoryLocks() extends Locks{
- val processLock = new MemoryLock()
-
- val serverLock = new MemoryLock()
-
- val clientLock = new MemoryLock()
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala
index 04ddbf0c..27c43302 100644
--- a/clientserver/src/mill/clientserver/Server.scala
+++ b/clientserver/src/mill/clientserver/Server.scala
@@ -12,7 +12,7 @@ trait ServerMain[T]{
this,
() => System.exit(0),
60000,
- new FileLocks(args0(0))
+ Locks.files(args0(0))
).run()
}
var stateCache = Option.empty[T]
@@ -29,19 +29,19 @@ class Server[T](lockBase: String,
sm: ServerMain[T],
interruptServer: () => Unit,
acceptTimeout: Int,
- locks: Locks) extends ClientServer(lockBase){
+ locks: Locks) {
val originalStdout = System.out
def run() = {
- locks.processLock.tryLockBlock{
+ Server.tryLockBlock(locks.processLock){
var running = true
while (running) {
- locks.serverLock.lockBlock{
- new File(ioPath).delete()
- val ioSocket = new UnixDomainServerSocket(ioPath)
- val sockOpt = ClientServer.interruptWith(
+ Server.lockBlock(locks.serverLock){
+ new File(lockBase + "/io").delete()
+ val ioSocket = new UnixDomainServerSocket(lockBase + "/io")
+ val sockOpt = Server.interruptWith(
acceptTimeout,
- new UnixDomainSocket(ioPath).close(),
+ new UnixDomainSocket(lockBase + "/io").close(),
ioSocket.accept()
)
@@ -63,8 +63,9 @@ class Server[T](lockBase: String,
val currentOutErr = clientSocket.getOutputStream
val socketIn = clientSocket.getInputStream
- val argStream = new FileInputStream(runFile)
- val (interactive, args) = ClientServer.parseArgs(argStream)
+ val argStream = new FileInputStream(lockBase + "/run")
+ val interactive = argStream.read() != 0;
+ val args = ClientServer.parseArgs(argStream)
argStream.close()
var done = false
@@ -83,7 +84,7 @@ class Server[T](lockBase: String,
sm.stateCache = newStateCache
java.nio.file.Files.write(
- java.nio.file.Paths.get(exitCodePath),
+ java.nio.file.Paths.get(lockBase + "/exitCode"),
(if (result) 0 else 1).toString.getBytes
)
} catch{case WatchInterrupted(sc: Option[T]) =>
@@ -108,4 +109,56 @@ class Server[T](lockBase: String,
clientSocket.close()
}
}
+object Server{
+ def lockBlock[T](lock: Lock)(t: => T): T = {
+ val l = lock.lock()
+ try t
+ finally l.release()
+ }
+ def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
+ lock.tryLock() match{
+ case null => None
+ case l =>
+ try Some(t)
+ finally l.release()
+ }
+
+ }
+ def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
+ @volatile var interrupt = true
+ @volatile var interrupted = false
+ new Thread(() => {
+ Thread.sleep(millis)
+ if (interrupt) {
+ interrupted = true
+ close
+ }
+ }).start()
+
+ try {
+ val res =
+ try Some(t)
+ catch {case e: Throwable => None}
+
+ if (interrupted) None
+ else res
+
+ } finally {
+ interrupt = false
+ }
+ }
+}
+
+class ProxyOutputStream(x: => java.io.OutputStream,
+ key: Int) extends java.io.OutputStream {
+ override def write(b: Int) = x.synchronized{
+ x.write(key)
+ x.write(b)
+ }
+}
+class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
+ def read() = x.read()
+ override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
+ override def read(b: Array[Byte]) = x.read(b)
+}
case class WatchInterrupted[T](stateCache: Option[T]) extends Exception \ No newline at end of file