summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2018-02-20 20:34:01 -0800
committerLi Haoyi <haoyi.sg@gmail.com>2018-02-20 20:34:01 -0800
commitd35e590619e38c4785b075a54a2c3c99ba609376 (patch)
tree2dc7bf33d5949cd3b011659b99ccfe60ab4a9407
parent3b82bef0ab30114b9f659475fc1c46acab028068 (diff)
downloadmill-d35e590619e38c4785b075a54a2c3c99ba609376.tar.gz
mill-d35e590619e38c4785b075a54a2c3c99ba609376.tar.bz2
mill-d35e590619e38c4785b075a54a2c3c99ba609376.zip
WIP
-rw-r--r--main/src/mill/ServerClient.scala373
1 files changed, 233 insertions, 140 deletions
diff --git a/main/src/mill/ServerClient.scala b/main/src/mill/ServerClient.scala
index 038d537c..9a2139b1 100644
--- a/main/src/mill/ServerClient.scala
+++ b/main/src/mill/ServerClient.scala
@@ -1,19 +1,30 @@
package mill
import java.io._
+import java.nio.channels.FileChannel
import java.util
import ammonite.main.Cli
import mill.main.MainRunner
+
+class ServerClient(lockBase: String){
+ val inFile = new java.io.File(lockBase + "/stdin")
+ val outErrFile = new java.io.File(lockBase + "/stdouterr")
+ val metaFile = new java.io.File(lockBase + "/stdmeta")
+ val logFile = new java.io.File(lockBase + "/log")
+ val runFile = new java.io.File(lockBase + "/run")
+ val tmpRunFile = new java.io.File(lockBase + "/run-tmp")
+ val pidFile = new java.io.File(lockBase + "/pid")
+}
object Client{
def WithLock[T](index: Int)(f: String => T): T = {
val lockBase = "out/mill-worker-" + index
new java.io.File(lockBase).mkdirs()
- val raf = new RandomAccessFile(lockBase+ "/lock", "rw")
- val channel = raf.getChannel
+ val lockFile = new RandomAccessFile(lockBase+ "/lock", "rw")
+ val channel = lockFile.getChannel
channel.tryLock() match{
case null =>
- raf.close()
+ lockFile.close()
channel.close()
if (index < 5) WithLock(index + 1)(f)
else throw new Exception("Reached max process limit: " + 5)
@@ -21,7 +32,7 @@ object Client{
try f(lockBase)
finally{
locked.release()
- raf.close()
+ lockFile.close()
channel.close()
}
}
@@ -29,102 +40,129 @@ object Client{
def main(args: Array[String]): Unit = {
WithLock(1) { lockBase =>
+ new Client(lockBase).run(args)
+ }
+ }
+}
- val inFile = new java.io.File(lockBase + "/stdin")
- val outErrFile = new java.io.File(lockBase + "/stdouterr")
- val metaFile = new java.io.File(lockBase + "/stdmeta")
- val runFile = new java.io.File(lockBase + "/run")
- val tmpRunFile = new java.io.File(lockBase + "/run-tmp")
- val pidFile = new java.io.File(lockBase + "/pid")
- outErrFile.delete()
- metaFile.delete()
- outErrFile.createNewFile()
- metaFile.createNewFile()
- inFile.createNewFile()
- inFile.createNewFile()
-
- val f = new FileOutputStream(tmpRunFile)
- f.write(if (System.console() != null) 1 else 0)
- f.write(args.length)
- var i = 0
- while (i < args.length){
- f.write(args(i).length)
- f.write(args(i).getBytes)
- i += 1
- }
- f.flush()
+class Client(lockBase: String) extends ServerClient(lockBase){
+ def run(args: Array[String]) = {
- tmpRunFile.renameTo(runFile)
+ outErrFile.delete()
+ metaFile.delete()
+ outErrFile.createNewFile()
+ metaFile.createNewFile()
+ inFile.createNewFile()
+ inFile.createNewFile()
+ logFile.createNewFile()
- val in = new FileOutputStream(inFile)
- val outErr = new FileInputStream(outErrFile)
- val meta = new FileInputStream(metaFile)
+ val f = new FileOutputStream(tmpRunFile)
+ f.write(if (System.console() != null) 1 else 0)
+ f.write(args.length)
+ var i = 0
+ while (i < args.length){
+ f.write(args(i).length)
+ f.write(args(i).getBytes)
+ i += 1
+ }
+ f.flush()
- if (!pidFile.exists()){
- val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
+ tmpRunFile.renameTo(runFile)
+ val in = new FileOutputStream(inFile)
+ val outErr = new FileInputStream(outErrFile)
+ val meta = new FileInputStream(metaFile)
- val l = new java.util.ArrayList[String]
- l.add("java")
- val props = System.getProperties
- val keys = props.stringPropertyNames().iterator()
- while(keys.hasNext){
- val k = keys.next()
- if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k))
- }
- l.add("-cp")
- l.add(selfJar)
- l.add("mill.Server")
- l.add(lockBase.toString)
-
- new java.lang.ProcessBuilder()
- .command(l)
- .redirectInput(inFile)
- .redirectOutput(outErrFile)
- .redirectError(outErrFile)
- .start()
+ val pidRaf = new RandomAccessFile(pidFile, "rw")
+ val pidLockChannel = pidRaf.getChannel
+
+ if (!probeLock(pidLockChannel)){
+ val selfJar = getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
+
+ val l = new java.util.ArrayList[String]
+ l.add("java")
+ val props = System.getProperties
+ val keys = props.stringPropertyNames().iterator()
+ while(keys.hasNext){
+ val k = keys.next()
+ if (k.startsWith("MILL_")) l.add("-D" + k + "=" + props.getProperty(k))
}
+ l.add("-cp")
+ l.add(selfJar)
+ l.add("mill.Server")
+ l.add(lockBase.toString)
+
+ new java.lang.ProcessBuilder()
+ .command(l)
+ .redirectInput(inFile)
+ .redirectOutput(logFile)
+ .redirectError(logFile)
+ .start()
+ }
+
+ while(!probeLock(pidLockChannel)) Thread.sleep(3)
+
+ try {
val buffer = new Array[Byte](1024)
val metaBuffer = new Array[Byte](1024)
- while({
- Thread.sleep(1)
- while({
- (if (outErr.available() > 0){
- val outErrN = outErr.read(buffer)
- if (outErrN > 0) {
- var metaN = 0
- while (metaN < outErrN) {
- val delta = meta.read(metaBuffer, 0, outErrN - metaN)
- if (delta > 0) {
- var i = 0
- while (i < delta) {
- metaBuffer(i) match {
- case 0 => System.out.write(buffer(metaN + i))
- case 1 => System.err.write(buffer(metaN + i))
- }
- i += 1
- }
- metaN += delta
- }
- }
- }
-
- true
- }else false) |
+ while ({
+ Thread.sleep(3)
+ while ( {
+ forwardForked(buffer, metaBuffer, meta, outErr) |
forward(buffer, System.in, in)
- })()
+ }) ()
- runFile.exists()
- })()
+ runFile.exists() && probeLock(pidLockChannel)
+ }) ()
+ }finally {
+ pidLockChannel.close()
+// pidFile.delete()
inFile.delete()
outErrFile.delete()
metaFile.delete()
+ }
+ }
+ def probeLock(pidLockChannel: FileChannel) = {
+
+ pidLockChannel.tryLock() match{
+ case null => true
+ case locked =>
+ locked.release()
+ false
}
+
}
+ def forwardForked(buffer: Array[Byte],
+ metaBuffer: Array[Byte],
+ meta: InputStream,
+ outErr: InputStream) = {
+ if (outErr.available() > 0){
+ val outErrN = outErr.read(buffer)
+ if (outErrN > 0) {
+ var metaN = 0
+ while (metaN < outErrN) {
+ val delta = meta.read(metaBuffer, 0, outErrN - metaN)
+ if (delta > 0) {
+ var i = 0
+ while (i < delta) {
+ metaBuffer(i) match {
+ case 0 => System.out.write(buffer(metaN + i))
+ case 1 => System.err.write(buffer(metaN + i))
+ }
+ i += 1
+ }
+ metaN += delta
+ }
+ }
+ }
+
+ true
+ }else false
+ }
def forward(buffer: Array[Byte], src: InputStream, dest: OutputStream) = {
if (src.available() != 0){
val n = src.read(buffer)
@@ -147,72 +185,127 @@ class ProxyInputStream(x: => java.io.InputStream) extends java.io.InputStream{
override def read(b: Array[Byte], off: Int, len: Int) = x.read(b, off, len)
override def read(b: Array[Byte]) = x.read(b)
}
+
object Server{
def main(args0: Array[String]): Unit = {
- import java.nio.file.{Paths, Files}
- val lockBase = Paths.get(args0(0))
- val runFile = lockBase.resolve("run")
- val inFile = lockBase.resolve("stdin")
- val outErrFile = lockBase.resolve("stdouterr")
- val metaFile = lockBase.resolve("stdmeta")
- var lastRun = System.currentTimeMillis()
- val pidFile = lockBase.resolve("pid")
- var currentIn = System.in
- var currentOutErr: OutputStream = System.out
- var currentMeta: OutputStream = System.err
- val raf = new RandomAccessFile(lockBase + "/lock", "rw")
- val channel = raf.getChannel
-
- Files.createFile(pidFile)
- var mainRunner = Option.empty[(Cli.Config, MainRunner)]
+ new Server(args0(0)).run()
+
+ }
+}
+
+class ProbeThread(lockChannel: FileChannel,
+ mainThread: Thread,
+ log: PrintStream) extends Runnable{
+ var running = true
+ def run() = {
+ while({
+ Thread.sleep(1)
+ lockChannel.tryLock() match{
+ case null => true && running
+ case locked =>
+ locked.release()
+ lockChannel.close()
+ System.exit(0)
+ false
+ }
+ })()
+ }
+}
+
+class Server(lockBase: String) extends ServerClient(lockBase){
+ var lastRun = System.currentTimeMillis()
+ var currentIn = System.in
+ var currentOutErr: OutputStream = System.out
+ var currentMeta: OutputStream = System.err
+ val lockFile = new RandomAccessFile(lockBase + "/lock", "rw")
+ val channel = lockFile.getChannel
+ var mainRunner = Option.empty[(Cli.Config, MainRunner)]
+
+ def run() = {
+ val originalStdout = System.out
+ originalStdout.println("Initializing")
+ val pidRaf = new RandomAccessFile(pidFile, "rw")
+ val lockChannel = pidRaf.getChannel
+ val lock = lockChannel.tryLock()
+ if (lock == null) throw new Exception("PID already present")
+ originalStdout.println("Locked pid file")
try {
while (System.currentTimeMillis() - lastRun < 60000) {
- if (!Files.exists(runFile)) Thread.sleep(10)
- else {
- currentIn = Files.newInputStream(inFile)
- currentOutErr = Files.newOutputStream(outErrFile)
- currentMeta = Files.newOutputStream(metaFile)
- val argStream = Files.newInputStream(runFile)
- val interactive = argStream.read() != 0
- val argsLength = argStream.read()
- val args = Array.fill(argsLength){
- val n = argStream.read()
- val arr = new Array[Byte](n)
- argStream.read(arr)
- new String(arr)
- }
- try {
- val (_, mr) = mill.Main.main0(
- args,
- mainRunner,
- interactive,
- () => {
- channel.tryLock() match{
- case null =>
- false
- case lock =>
- lock.release()
- true
- }
- },
- new ProxyInputStream(currentIn),
- new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true),
- new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true)
- )
-
- mainRunner = mr
- } catch{case MainRunner.WatchInterrupted(mr) =>
- mainRunner = Some((mr.config, mr))
- } finally{
- Files.delete(runFile)
-
- lastRun = System.currentTimeMillis()
- }
- }
+ pollOrRun(originalStdout)
+ originalStdout.println("Delta " + (System.currentTimeMillis() - lastRun))
+ originalStdout.println("Threads " + Thread.activeCount())
}
}finally{
- Files.delete(pidFile)
+ originalStdout.println("Exiting Server... " + System.currentTimeMillis())
+ lock.release()
+ lockChannel.close()
+ pidRaf.close()
+ pidFile.delete()
}
+ originalStdout.println("END")
}
-}
+ def pollOrRun(originalStdout: PrintStream) = {
+ if (!runFile.exists()) Thread.sleep(30)
+ else try{
+ originalStdout.println("Handling Run")
+ handleRun(originalStdout)
+ }catch{
+ case e: Throwable =>
+ originalStdout.println("Run Failed")
+ e.printStackTrace(originalStdout)
+ }finally{
+ lastRun = System.currentTimeMillis()
+ originalStdout.println("Updating lastRun " + lastRun)
+ }
+ }
+ def handleRun(originalStdout: PrintStream) = {
+ currentIn = new FileInputStream(inFile)
+ currentOutErr = new FileOutputStream(outErrFile)
+ currentMeta = new FileOutputStream(metaFile)
+ val lockChannel = lockFile.getChannel
+ val probe = new ProbeThread(lockChannel, Thread.currentThread(), originalStdout)
+ val probeThread = new Thread(probe)
+ probeThread.start()
+ val argStream = new FileInputStream(runFile)
+ val interactive = argStream.read() != 0
+ val argsLength = argStream.read()
+ val args = Array.fill(argsLength){
+ val n = argStream.read()
+ val arr = new Array[Byte](n)
+ argStream.read(arr)
+ new String(arr)
+ }
+ originalStdout.println("Parsed Args " + args.toList)
+ try {
+ originalStdout.println("Running Main")
+ val (_, mr) = mill.Main.main0(
+ args,
+ mainRunner,
+ interactive,
+ () => {
+ channel.tryLock() match{
+ case null =>
+ false
+ case lock =>
+ lock.release()
+ true
+ }
+ },
+ new ProxyInputStream(currentIn),
+ new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 0), true),
+ new PrintStream(new ProxyOutputStream(currentOutErr, currentMeta, 1), true)
+ )
+ originalStdout.println("Finished Main")
+ mainRunner = mr
+ } catch{case MainRunner.WatchInterrupted(mr) =>
+ mainRunner = Some((mr.config, mr))
+ } finally{
+// lockChannel.close()
+// lockFile.close()
+ probe.running = false
+ probeThread.join()
+ runFile.delete()
+ }
+ }
+} \ No newline at end of file