summaryrefslogtreecommitdiff
path: root/clientserver/src
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-02-24 10:39:20 -0800
committerLi Haoyi <haoyi.sg@gmail.com>2018-02-24 17:13:03 -0800
commite76a4eda93e96e6817ab4f337f567fd325efe545 (patch)
tree76067854dc43facac259df017baf35de2aed221e /clientserver/src
parentc98408adf2d96928fe227a740631a8efd8e0c339 (diff)
downloadmill-e76a4eda93e96e6817ab4f337f567fd325efe545.tar.gz
mill-e76a4eda93e96e6817ab4f337f567fd325efe545.tar.bz2
mill-e76a4eda93e96e6817ab4f337f567fd325efe545.zip
.
wip
Diffstat (limited to 'clientserver/src')
-rw-r--r--clientserver/src/mill/clientserver/Client.scala9
-rw-r--r--clientserver/src/mill/clientserver/ClientServer.scala22
-rw-r--r--clientserver/src/mill/clientserver/Server.scala40
3 files changed, 49 insertions, 22 deletions
diff --git a/clientserver/src/mill/clientserver/Client.scala b/clientserver/src/mill/clientserver/Client.scala
index dcf65271..b7a39cb2 100644
--- a/clientserver/src/mill/clientserver/Client.scala
+++ b/clientserver/src/mill/clientserver/Client.scala
@@ -34,14 +34,17 @@ class Client(lockBase: String,
stdout: OutputStream,
stderr: OutputStream) extends ClientServer(lockBase){
def run(args: Array[String]) = {
-
+ println("Client Run")
val f = new FileOutputStream(runFile)
ClientServer.writeArgs(System.console() != null, args, f)
f.close()
if (locks.processLock.probe()) initServer()
while(locks.processLock.probe()) Thread.sleep(3)
+ println("Client Connect Socket")
+
val ioSocket = ClientServer.retry(1000, new UnixDomainSocket(ioPath))
+ println("Client Connected Socket")
val outErr = ioSocket.getInputStream
val in = ioSocket.getOutputStream
val outPump = new ClientOutputPumper(outErr, stdout, stderr)
@@ -52,6 +55,10 @@ class Client(lockBase: String,
inThread.setDaemon(true)
outThread.start()
inThread.start()
+ println("Client Await Server Lock")
+
locks.serverLock.await()
+ println("Client End")
+
}
}
diff --git a/clientserver/src/mill/clientserver/ClientServer.scala b/clientserver/src/mill/clientserver/ClientServer.scala
index 2cc38859..84ea2e00 100644
--- a/clientserver/src/mill/clientserver/ClientServer.scala
+++ b/clientserver/src/mill/clientserver/ClientServer.scala
@@ -48,17 +48,27 @@ object ClientServer{
}
}
- def interruptWith[T](millis: Int, close: => Unit)(t: => T): T = {
- var int = true
+ def interruptWith[T](millis: Int, close: => Unit)(t: => T): Option[T] = {
+ @volatile var interrupt = true
+ @volatile var interrupted = false
new Thread(() => {
Thread.sleep(millis)
- if (int) close
+ if (interrupt) {
+ close
+ interrupted = true
+ }
}).start()
- try t
- finally {
+ try {
+ val res =
+ try Some(t)
+ catch {case e: Throwable => None}
+
+ if (interrupted) None
+ else res
- int = false
+ } finally {
+ interrupt = false
}
}
diff --git a/clientserver/src/mill/clientserver/Server.scala b/clientserver/src/mill/clientserver/Server.scala
index ad2e35e4..1271933d 100644
--- a/clientserver/src/mill/clientserver/Server.scala
+++ b/clientserver/src/mill/clientserver/Server.scala
@@ -3,7 +3,7 @@ package mill.clientserver
import java.io._
import java.net.Socket
-import org.scalasbt.ipcsocket.UnixDomainServerSocket
+import org.scalasbt.ipcsocket.{UnixDomainServerSocket, UnixDomainSocket}
trait ServerMain[T]{
def main(args0: Array[String]): Unit = {
@@ -11,7 +11,7 @@ trait ServerMain[T]{
args0(0),
this,
() => System.exit(0),
- () => System.currentTimeMillis(),
+ 60000,
new FileLocks(args0(0))
).run()
}
@@ -29,30 +29,39 @@ trait ServerMain[T]{
class Server[T](lockBase: String,
sm: ServerMain[T],
interruptServer: () => Unit,
- currentTimeMillis: () => Long,
+ acceptTimeout: Int,
locks: Locks) extends ClientServer(lockBase){
val originalStdout = System.out
def run() = {
locks.processLock.tryLockBlock{
- var lastRun = currentTimeMillis()
- while (currentTimeMillis() - lastRun < 60000) locks.serverLock.lockBlock{
+ println("Server Process Lock")
+ var running = true
+ while (running) locks.serverLock.lockBlock{
+ println("Server Lock")
new File(ioPath).delete()
+ println("Server Accept Socket")
val ioSocket = new UnixDomainServerSocket(ioPath)
val sockOpt = ClientServer.interruptWith(
- 1000,
- ioSocket.close()
+ acceptTimeout,
+ {
+ println("Server Socket Timing Out Close")
+ try new UnixDomainSocket(ioPath).close()
+ catch{case e: Throwable => }
+ }
){
- try Some(ioSocket.accept())
- catch{ case e: IOException => None}
+ ioSocket.accept()
}
-
- sockOpt.foreach{sock =>
- try handleRun(sock)
- catch{case e: Throwable => e.printStackTrace(originalStdout) }
- finally lastRun = currentTimeMillis()
+ sockOpt match{
+ case None => running = false
+ case Some(sock) =>
+ println("Server Handle Run")
+ try handleRun(sock)
+ catch{case e: Throwable => e.printStackTrace(originalStdout) }
}
+ println("Server Unlock")
}
+ println("Server Process Unlock")
}.getOrElse(throw new Exception("PID already present"))
}
@@ -88,7 +97,7 @@ class Server[T](lockBase: String,
)
t.start()
-
+ println("Server Poll Client/Done")
// We cannot simply use Lock#await here, because the filesystem doesn't
// realize the clientLock/serverLock are held by different threads in the
// two processes and gives a spurious deadlock error
@@ -98,6 +107,7 @@ class Server[T](lockBase: String,
t.interrupt()
t.stop()
+ println("Server Socket Close")
clientSocket.close()
}
}