path: root/clientserver
diff options
Diffstat (limited to 'clientserver')
6 files changed, 0 insertions, 706 deletions
diff --git a/clientserver/src/mill/clientserver/ b/clientserver/src/mill/clientserver/
deleted file mode 100644
index c4cbc265..00000000
--- a/clientserver/src/mill/clientserver/
+++ /dev/null
@@ -1,219 +0,0 @@
-package mill.clientserver;
-import org.scalasbt.ipcsocket.*;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Properties;
-public class Client {
- static void initServer(String lockBase, boolean setJnaNoSys) throws IOException,URISyntaxException{
- ArrayList<String> selfJars = new ArrayList<String>();
- ClassLoader current = Client.class.getClassLoader();
- while(current != null){
- if (current instanceof {
- URL[] urls = (( current).getURLs();
- for (URL url: urls) {
- selfJars.add(new File(url.toURI()).getCanonicalPath());
- }
- }
- current = current.getParent();
- }
- if (ClientServer.isJava9OrAbove) {
- selfJars.addAll(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
- }
- ArrayList<String> l = new java.util.ArrayList<String>();
- l.add("java");
- Properties props = System.getProperties();
- Iterator<String> keys = props.stringPropertyNames().iterator();
- while(keys.hasNext()){
- String k =;
- if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k));
- }
- if (setJnaNoSys) {
- l.add("-Djna.nosys=true");
- }
- l.add("-cp");
- l.add(String.join(File.pathSeparator, selfJars));
- l.add("mill.ServerMain");
- l.add(lockBase);
- new java.lang.ProcessBuilder()
- .command(l)
- .redirectOutput(new + "/logs"))
- .redirectError(new + "/logs"))
- .start();
- }
- public static void main(String[] args) throws Exception{
- boolean setJnaNoSys = System.getProperty("jna.nosys") == null;
- if (setJnaNoSys) {
- System.setProperty("jna.nosys", "true");
- }
- int index = 0;
- while (index < 5) {
- index += 1;
- String lockBase = "out/mill-worker-" + index;
- new;
- RandomAccessFile lockFile = new RandomAccessFile(lockBase + "/clientLock", "rw");
- FileChannel channel = lockFile.getChannel();
- java.nio.channels.FileLock tryLock = channel.tryLock();
- if (tryLock == null) {
- lockFile.close();
- channel.close();
- } else {
- int exitCode =
- lockBase,
- new Runnable() {
- @Override
- public void run() {
- try{
- initServer(lockBase, setJnaNoSys);
- }catch(Exception e){
- throw new RuntimeException(e);
- }
- }
- },
- Locks.files(lockBase),
- System.out,
- System.err,
- args
- );
- System.exit(exitCode);
- }
- }
- throw new Exception("Reached max process limit: " + 5);
- }
- public static int run(String lockBase,
- Runnable initServer,
- Locks locks,
- InputStream stdin,
- OutputStream stdout,
- OutputStream stderr,
- String[] args) throws Exception{
- FileOutputStream f = new FileOutputStream(lockBase + "/run");
- ClientServer.writeArgs(System.console() != null, args, f);
- f.close();
- boolean serverInit = false;
- if (locks.processLock.probe()) {
- serverInit = true;
- }
- while(locks.processLock.probe()) Thread.sleep(3);
- // Need to give sometime for Win32NamedPipeSocket to work
- // if the server is just initialized
- if (serverInit && ClientServer.isWindows) Thread.sleep(1000);
- Socket ioSocket = null;
- long retryStart = System.currentTimeMillis();
- while(ioSocket == null && System.currentTimeMillis() - retryStart < 1000){
- try{
- ioSocket = ClientServer.isWindows?
- new Win32NamedPipeSocket(ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName())
- : new UnixDomainSocket(lockBase + "/io");
- }catch(Throwable e){
- Thread.sleep(1);
- }
- }
- if (ioSocket == null){
- throw new Exception("Failed to connect to server");
- }
- InputStream outErr = ioSocket.getInputStream();
- OutputStream in = ioSocket.getOutputStream();
- ClientOutputPumper outPump = new ClientOutputPumper(outErr, stdout, stderr);
- InputPumper inPump = new InputPumper(stdin, in, true);
- Thread outThread = new Thread(outPump);
- outThread.setDaemon(true);
- Thread inThread = new Thread(inPump);
- inThread.setDaemon(true);
- outThread.start();
- inThread.start();
- locks.serverLock.await();
- try{
- return Integer.parseInt(
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(lockBase + "/exitCode")
- )
- ).readLine()
- );
- } catch(Throwable e){
- return 1;
- }
- }
-class ClientOutputPumper implements Runnable{
- private InputStream src;
- private OutputStream dest1;
- private OutputStream dest2;
- public ClientOutputPumper(InputStream src, OutputStream dest1, OutputStream dest2){
- this.src = src;
- this.dest1 = dest1;
- this.dest2 = dest2;
- }
- public void run() {
- byte[] buffer = new byte[1024];
- int state = 0;
- boolean running = true;
- boolean first = true;
- while (running) {
- try {
- int n =;
- first = false;
- if (n == -1) running = false;
- else {
- int i = 0;
- while (i < n) {
- switch (state) {
- case 0:
- state = buffer[i] + 1;
- break;
- case 1:
- dest1.write(buffer[i]);
- state = 0;
- break;
- case 2:
- dest2.write(buffer[i]);
- state = 0;
- break;
- }
- i += 1;
- }
- dest1.flush();
- dest2.flush();
- }
- } catch (IOException e) {
- // Win32NamedPipeSocket input stream somehow doesn't return -1,
- // instead it throws an IOException whose message contains "ReadFile()".
- // However, if it throws an IOException before ever reading some bytes,
- // it could not connect to the server, so exit.
- if (ClientServer.isWindows && e.getMessage().contains("ReadFile()")) {
- if (first) {
- System.err.println("Failed to connect to server");
- System.exit(1);
- } else running = false;
- } else {
- e.printStackTrace();
- System.exit(1);
- }
- }
- }
- }
diff --git a/clientserver/src/mill/clientserver/ b/clientserver/src/mill/clientserver/
deleted file mode 100644
index e2e63dcf..00000000
--- a/clientserver/src/mill/clientserver/
+++ /dev/null
@@ -1,42 +0,0 @@
-package mill.clientserver;
-public class ClientServer {
- public static boolean isWindows = System.getProperty("").toLowerCase().startsWith("windows");
- public static boolean isJava9OrAbove = !System.getProperty("java.specification.version").startsWith("1.");
- // Windows named pipe prefix (see
- // Win32NamedPipeServerSocket automatically adds this as a prefix (if it is not already is prefixed),
- // but Win32NamedPipeSocket does not
- //
- public static String WIN32_PIPE_PREFIX = "\\\\.\\pipe\\";
- public static String[] parseArgs(InputStream argStream) throws IOException {
- int argsLength =;
- String[] args = new String[argsLength];
- for (int i = 0; i < args.length; i++) {
- int n =;
- byte[] arr = new byte[n];
- args[i] = new String(arr);
- }
- return args;
- }
- public static void writeArgs(Boolean interactive,
- String[] args,
- OutputStream argStream) throws IOException{
- argStream.write(interactive ? 1 : 0);
- argStream.write(args.length);
- int i = 0;
- while (i < args.length){
- argStream.write(args[i].length());
- argStream.write(args[i].getBytes());
- i += 1;
- }
- }
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/ b/clientserver/src/mill/clientserver/
deleted file mode 100644
index 9f4bfb82..00000000
--- a/clientserver/src/mill/clientserver/
+++ /dev/null
@@ -1,37 +0,0 @@
-package mill.clientserver;
-public class InputPumper implements Runnable{
- private InputStream src;
- private OutputStream dest;
- private Boolean checkAvailable;
- public InputPumper(InputStream src,
- OutputStream dest,
- Boolean checkAvailable){
- this.src = src;
- this.dest = dest;
- this.checkAvailable = checkAvailable;
- }
- boolean running = true;
- public void run() {
- byte[] buffer = new byte[1024];
- try{
- while(running){
- if (checkAvailable && src.available() == 0) Thread.sleep(2);
- else {
- int n =;
- if (n == -1) running = false;
- else {
- dest.write(buffer, 0, n);
- dest.flush();
- }
- }
- }
- }catch(Exception e){
- throw new RuntimeException(e);
- }
- }
-} \ No newline at end of file
diff --git a/clientserver/src/mill/clientserver/ b/clientserver/src/mill/clientserver/
deleted file mode 100644
index 78c8dc6e..00000000
--- a/clientserver/src/mill/clientserver/
+++ /dev/null
@@ -1,105 +0,0 @@
-package mill.clientserver;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.locks.ReentrantLock;
-abstract class Lock{
- abstract public Locked lock() throws Exception;
- abstract public Locked tryLock() throws Exception;
- public void await() throws Exception{
- lock().release();
- }
- /**
- * Returns `true` if the lock is *available for taking*
- */
- abstract public boolean probe() throws Exception;
-interface Locked{
- void release() throws Exception;
-class Locks{
- public Lock processLock;
- public Lock serverLock;
- public Lock clientLock;
- static Locks files(String lockBase) throws Exception{
- return new Locks(){{
- processLock = new FileLock(lockBase + "/pid");
- serverLock = new FileLock(lockBase + "/serverLock");
- clientLock = new FileLock(lockBase + "/clientLock");
- }};
- }
- static Locks memory(){
- return new Locks(){{
- this.processLock = new MemoryLock();
- this.serverLock = new MemoryLock();
- this.clientLock = new MemoryLock();
- }};
- }
-class FileLocked implements Locked{
- private java.nio.channels.FileLock lock;
- public FileLocked(java.nio.channels.FileLock lock){
- this.lock = lock;
- }
- public void release() throws Exception{
- this.lock.release();
- }
-class FileLock extends Lock{
- String path;
- RandomAccessFile raf;
- FileChannel chan;
- public FileLock(String path) throws Exception{
- this.path = path;
- raf = new RandomAccessFile(path, "rw");
- chan = raf.getChannel();
- }
- public Locked lock() throws Exception{
- return new FileLocked(chan.lock());
- }
- public Locked tryLock() throws Exception{
- java.nio.channels.FileLock l = chan.tryLock();
- if (l == null) return null;
- else return new FileLocked(l);
- }
- public boolean probe()throws Exception{
- java.nio.channels.FileLock l = chan.tryLock();
- if (l == null) return false;
- else {
- l.release();
- return true;
- }
- }
-class MemoryLocked implements Locked{
- java.util.concurrent.locks.Lock l;
- public MemoryLocked(java.util.concurrent.locks.Lock l){
- this.l = l;
- }
- public void release() throws Exception{
- l.unlock();
- }
-class MemoryLock extends Lock{
- ReentrantLock innerLock = new ReentrantLock(true);
- public boolean probe(){
- return !innerLock.isLocked();
- }
- public Locked lock() {
- innerLock.lock();
- return new MemoryLocked(innerLock);
- }
- public Locked tryLock() {
- if (innerLock.tryLock()) return new MemoryLocked(innerLock);
- else return null;
- }
diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala
deleted file mode 100644
index 24827ac2..00000000
--- a/clientserver/src/mill/clientserver/Server.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-package mill.clientserver
-import org.scalasbt.ipcsocket._
-trait ServerMain[T]{
- def main(args0: Array[String]): Unit = {
- new Server(
- args0(0),
- this,
- () => System.exit(0),
- 300000,
- Locks.files(args0(0))
- ).run()
- }
- var stateCache = Option.empty[T]
- def main0(args: Array[String],
- stateCache: Option[T],
- mainInteractive: Boolean,
- stdin: InputStream,
- stdout: PrintStream,
- stderr: PrintStream): (Boolean, Option[T])
-class Server[T](lockBase: String,
- sm: ServerMain[T],
- interruptServer: () => Unit,
- acceptTimeout: Int,
- locks: Locks) {
- val originalStdout = System.out
- def run() = {
- Server.tryLockBlock(locks.processLock){
- var running = true
- while (running) {
- Server.lockBlock(locks.serverLock){
- val (serverSocket, socketClose) = if (ClientServer.isWindows) {
- val socketName = ClientServer.WIN32_PIPE_PREFIX + new File(lockBase).getName
- (new Win32NamedPipeServerSocket(socketName), () => new Win32NamedPipeSocket(socketName).close())
- } else {
- val socketName = lockBase + "/io"
- new File(socketName).delete()
- (new UnixDomainServerSocket(socketName), () => new UnixDomainSocket(socketName).close())
- }
- val sockOpt = Server.interruptWith(
- acceptTimeout,
- socketClose(),
- serverSocket.accept()
- )
- sockOpt match{
- case None => running = false
- case Some(sock) =>
- try {
- handleRun(sock)
- serverSocket.close()
- }
- catch{case e: Throwable => e.printStackTrace(originalStdout) }
- }
- }
- // Make sure you give an opportunity for the client to probe the lock
- // and realize the server has released it to signal completion
- Thread.sleep(10)
- }
- }.getOrElse(throw new Exception("PID already present"))
- }
- def handleRun(clientSocket: Socket) = {
- val currentOutErr = clientSocket.getOutputStream
- val socketIn = clientSocket.getInputStream
- val argStream = new FileInputStream(lockBase + "/run")
- val interactive = != 0;
- val args = ClientServer.parseArgs(argStream)
- argStream.close()
- var done = false
- val t = new Thread(() =>
- try {
- val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true)
- val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true)
- val (result, newStateCache) = sm.main0(
- args,
- sm.stateCache,
- interactive,
- socketIn,
- stdout, stderr
- )
- sm.stateCache = newStateCache
- java.nio.file.Files.write(
- java.nio.file.Paths.get(lockBase + "/exitCode"),
- (if (result) 0 else 1).toString.getBytes
- )
- } catch{case WatchInterrupted(sc: Option[T]) =>
- sm.stateCache = sc
- } finally{
- done = true
- }
- )
- t.start()
- // We cannot simply use Lock#await here, because the filesystem doesn't
- // realize the clientLock/serverLock are held by different threads in the
- // two processes and gives a spurious deadlock error
- while(!done && !locks.clientLock.probe()) {
- Thread.sleep(3)
- }
- if (!done) interruptServer()
- t.interrupt()
- t.stop()
- if (ClientServer.isWindows) {
- // Closing Win32NamedPipeSocket can often take ~5s
- // It seems OK to exit the client early and subsequently
- // start up mill client again (perhaps closing the server
- // socket helps speed up the process).
- val t = new Thread(() => clientSocket.close())
- t.setDaemon(true)
- t.start()
- } else clientSocket.close()
- }
-object Server{
- def lockBlock[T](lock: Lock)(t: => T): T = {
- val l = lock.lock()
- try t
- finally l.release()
- }
- def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = {
- lock.tryLock() match{
- case null => None
- case l =>
- try Some(t)
- finally l.release()
- }
- }
- def interruptWith[T](millis: Int, close: => Unit, t: => T): Option[T] = {
- @volatile var interrupt = true
- @volatile var interrupted = false
- new Thread(() => {
- Thread.sleep(millis)
- if (interrupt) {
- interrupted = true
- close
- }
- }).start()
- try {
- val res =
- try Some(t)
- catch {case e: Throwable => None}
- if (interrupted) None
- else res
- } finally {
- interrupt = false
- }
- }
-class ProxyOutputStream(x: =>,
- key: Int) extends {
- override def write(b: Int) = x.synchronized{
- x.write(key)
- x.write(b)
- }
-class ProxyInputStream(x: => extends{
- def read() =
- override def read(b: Array[Byte], off: Int, len: Int) =, off, len)
- override def read(b: Array[Byte]) =
-case class WatchInterrupted[T](stateCache: Option[T]) extends Exception
diff --git a/clientserver/test/src/mill/clientserver/ClientServerTests.scala b/clientserver/test/src/mill/clientserver/ClientServerTests.scala
deleted file mode 100644
index 2c9a57b0..00000000
--- a/clientserver/test/src/mill/clientserver/ClientServerTests.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-package mill.clientserver
-import java.nio.file.Path
-import utest._
-class EchoServer extends ServerMain[Int]{
- def main0(args: Array[String],
- stateCache: Option[Int],
- mainInteractive: Boolean,
- stdin: InputStream,
- stdout: PrintStream,
- stderr: PrintStream) = {
- val reader = new BufferedReader(new InputStreamReader(stdin))
- val str = reader.readLine()
- stdout.println(str + args(0))
- stdout.flush()
- stderr.println(str.toUpperCase + args(0))
- stderr.flush()
- (true, None)
- }
-object ClientServerTests extends TestSuite{
- def initStreams() = {
- val in = new ByteArrayInputStream("hello\n".getBytes())
- val out = new ByteArrayOutputStream()
- val err = new ByteArrayOutputStream()
- (in, out, err)
- }
- def init() = {
- val tmpDir = java.nio.file.Files.createTempDirectory("")
- val locks = Locks.memory()
- (tmpDir, locks)
- }
- def tests = Tests{
- 'hello - {
- val (tmpDir, locks) = init()
- def spawnEchoServer(): Unit = {
- new Thread(() => new Server(
- tmpDir.toString,
- new EchoServer(),
- () => (),
- 1000,
- locks
- ).run()).start()
- }
- def runClient(arg: String) = {
- val (in, out, err) = initStreams()
- Server.lockBlock(locks.clientLock){
- tmpDir.toString,
- () => spawnEchoServer(),
- locks,
- in,
- out,
- err,
- Array(arg)
- )
- Thread.sleep(100)
- (new String(out.toByteArray), new String(err.toByteArray))
- }
- }
- // Make sure the simple "have the client start a server and
- // exchange one message" workflow works from end to end.
- assert(
- locks.clientLock.probe(),
- locks.serverLock.probe(),
- locks.processLock.probe()
- )
- val (out1, err1) = runClient("world")
- assert(
- out1 == "helloworld\n",
- err1 == "HELLOworld\n"
- )
- // Give a bit of time for the server to release the lock and
- // re-acquire it to signal to the client that it's done
- Thread.sleep(100)
- assert(
- locks.clientLock.probe(),
- !locks.serverLock.probe(),
- !locks.processLock.probe()
- )
- // A seecond client in sequence connect to the same server
- val (out2, err2) = runClient(" WORLD")
- assert(
- out2 == "hello WORLD\n",
- err2 == "HELLO WORLD\n"
- )
- // Make sure the server times out of not used for a while
- Thread.sleep(2000)
- assert(
- locks.clientLock.probe(),
- locks.serverLock.probe(),
- locks.processLock.probe()
- )
- // Have a third client spawn/connect-to a new server at the same path
- val (out3, err3) = runClient(" World")
- assert(
- out3 == "hello World\n",
- err3 == "HELLO World\n"
- )
- }
- }