From e76a4eda93e96e6817ab4f337f567fd325efe545 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sat, 24 Feb 2018 10:39:20 -0800 Subject: . wip --- clientserver/src/mill/clientserver/Client.scala | 9 ++++- .../src/mill/clientserver/ClientServer.scala | 22 ++++++++---- clientserver/src/mill/clientserver/Server.scala | 40 ++++++++++++++-------- .../src/mill/clientserver/ClientServerTests.scala | 5 ++- 4 files changed, 53 insertions(+), 23 deletions(-) (limited to 'clientserver') diff --git a/clientserver/src/mill/clientserver/Client.scala b/clientserver/src/mill/clientserver/Client.scala index dcf65271..b7a39cb2 100644 --- a/clientserver/src/mill/clientserver/Client.scala +++ b/clientserver/src/mill/clientserver/Client.scala @@ -34,14 +34,17 @@ class Client(lockBase: String, stdout: OutputStream, stderr: OutputStream) extends ClientServer(lockBase){ def run(args: Array[String]) = { - + println("Client Run") val f = new FileOutputStream(runFile) ClientServer.writeArgs(System.console() != null, args, f) f.close() if (locks.processLock.probe()) initServer() while(locks.processLock.probe()) Thread.sleep(3) + println("Client Connect Socket") + val ioSocket = ClientServer.retry(1000, new UnixDomainSocket(ioPath)) + println("Client Connected Socket") val outErr = ioSocket.getInputStream val in = ioSocket.getOutputStream val outPump = new ClientOutputPumper(outErr, stdout, stderr) @@ -52,6 +55,10 @@ class Client(lockBase: String, inThread.setDaemon(true) outThread.start() inThread.start() + println("Client Await Server Lock") + locks.serverLock.await() + println("Client End") + } } diff --git a/clientserver/src/mill/clientserver/ClientServer.scala b/clientserver/src/mill/clientserver/ClientServer.scala index 2cc38859..84ea2e00 100644 --- a/clientserver/src/mill/clientserver/ClientServer.scala +++ b/clientserver/src/mill/clientserver/ClientServer.scala @@ -48,17 +48,27 @@ object ClientServer{ } } - def interruptWith[T](millis: Int, close: => Unit)(t: => T): T = { - var int = true + def interruptWith[T](millis: Int, close: => Unit)(t: => T): Option[T] = { + @volatile var interrupt = true + @volatile var interrupted = false new Thread(() => { Thread.sleep(millis) - if (int) close + if (interrupt) { + close + interrupted = true + } }).start() - try t - finally { + try { + val res = + try Some(t) + catch {case e: Throwable => None} + + if (interrupted) None + else res - int = false + } finally { + interrupt = false } } diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala index ad2e35e4..1271933d 100644 --- a/clientserver/src/mill/clientserver/Server.scala +++ b/clientserver/src/mill/clientserver/Server.scala @@ -3,7 +3,7 @@ package mill.clientserver import java.io._ import java.net.Socket -import org.scalasbt.ipcsocket.UnixDomainServerSocket +import org.scalasbt.ipcsocket.{UnixDomainServerSocket, UnixDomainSocket} trait ServerMain[T]{ def main(args0: Array[String]): Unit = { @@ -11,7 +11,7 @@ trait ServerMain[T]{ args0(0), this, () => System.exit(0), - () => System.currentTimeMillis(), + 60000, new FileLocks(args0(0)) ).run() } @@ -29,30 +29,39 @@ trait ServerMain[T]{ class Server[T](lockBase: String, sm: ServerMain[T], interruptServer: () => Unit, - currentTimeMillis: () => Long, + acceptTimeout: Int, locks: Locks) extends ClientServer(lockBase){ val originalStdout = System.out def run() = { locks.processLock.tryLockBlock{ - var lastRun = currentTimeMillis() - while (currentTimeMillis() - lastRun < 60000) locks.serverLock.lockBlock{ + println("Server Process Lock") + var running = true + while (running) locks.serverLock.lockBlock{ + println("Server Lock") new File(ioPath).delete() + println("Server Accept Socket") val ioSocket = new UnixDomainServerSocket(ioPath) val sockOpt = ClientServer.interruptWith( - 1000, - ioSocket.close() + acceptTimeout, + { + println("Server Socket Timing Out Close") + try new UnixDomainSocket(ioPath).close() + catch{case e: Throwable => } + } ){ - try Some(ioSocket.accept()) - catch{ case e: IOException => None} + ioSocket.accept() } - - sockOpt.foreach{sock => - try handleRun(sock) - catch{case e: Throwable => e.printStackTrace(originalStdout) } - finally lastRun = currentTimeMillis() + sockOpt match{ + case None => running = false + case Some(sock) => + println("Server Handle Run") + try handleRun(sock) + catch{case e: Throwable => e.printStackTrace(originalStdout) } } + println("Server Unlock") } + println("Server Process Unlock") }.getOrElse(throw new Exception("PID already present")) } @@ -88,7 +97,7 @@ class Server[T](lockBase: String, ) t.start() - + println("Server Poll Client/Done") // We cannot simply use Lock#await here, because the filesystem doesn't // realize the clientLock/serverLock are held by different threads in the // two processes and gives a spurious deadlock error @@ -98,6 +107,7 @@ class Server[T](lockBase: String, t.interrupt() t.stop() + println("Server Socket Close") clientSocket.close() } } diff --git a/clientserver/test/src/mill/clientserver/ClientServerTests.scala b/clientserver/test/src/mill/clientserver/ClientServerTests.scala index ecf09ab3..55453c4f 100644 --- a/clientserver/test/src/mill/clientserver/ClientServerTests.scala +++ b/clientserver/test/src/mill/clientserver/ClientServerTests.scala @@ -46,7 +46,7 @@ object ClientServerTests extends TestSuite{ tmpDir.toString, new EchoServer(), () => (), - () => currentTimeMillis, + 1000, locks ).run()).start() } @@ -55,6 +55,7 @@ object ClientServerTests extends TestSuite{ def runClient(arg: String) = { val (in, out, err) = initStreams() locks.clientLock.lockBlock{ + println("Client Lock") val c = new Client( tmpDir.toString, () => spawnEchoServer(), @@ -64,6 +65,7 @@ object ClientServerTests extends TestSuite{ err ) c.run(Array(arg)) + println("Client unlock") (new String(out.toByteArray), new String(err.toByteArray)) } } @@ -99,6 +101,7 @@ object ClientServerTests extends TestSuite{ ) // Make sure the server times out of not used for a while + println("Sleep 2000") currentTimeMillis += 60001 Thread.sleep(2000) assert( -- cgit v1.2.3