summaryrefslogtreecommitdiff
path: root/clientserver/src/mill/clientserver/Server.scala
blob: 52549f6927b17443e60fee0ad720ee3ef923d6e3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package mill.clientserver

import java.io._
import java.net.Socket

import org.scalasbt.ipcsocket.{UnixDomainServerSocket, UnixDomainSocket}

trait ServerMain[T]{
  def main(args0: Array[String]): Unit = {
    new Server(
      args0(0),
      this,
      () => System.exit(0),
      60000,
      new FileLocks(args0(0))
    ).run()
  }
  var stateCache = Option.empty[T]
  def main0(args: Array[String],
            stateCache: Option[T],
            mainInteractive: Boolean,
            watchInterrupted: () => Boolean,
            stdin: InputStream,
            stdout: PrintStream,
            stderr: PrintStream): (Boolean, Option[T])
}


class Server[T](lockBase: String,
                sm: ServerMain[T],
                interruptServer: () => Unit,
                acceptTimeout: Int,
                locks: Locks) extends ClientServer(lockBase){

  val originalStdout = System.out
  def run() = {
    locks.processLock.tryLockBlock{
      var running = true
      while (running) locks.serverLock.lockBlock{
        new File(ioPath).delete()
        val ioSocket = new UnixDomainServerSocket(ioPath)
        val sockOpt = ClientServer.interruptWith(
          acceptTimeout,
          new UnixDomainSocket(ioPath).close(),
          ioSocket.accept()
        )

        sockOpt match{
          case None => running = false
          case Some(sock) =>
            try handleRun(sock)
            catch{case e: Throwable => e.printStackTrace(originalStdout) }
        }
      }
    }.getOrElse(throw new Exception("PID already present"))
  }

  def handleRun(clientSocket: Socket) = {

    val currentOutErr = clientSocket.getOutputStream
    val socketIn = clientSocket.getInputStream
    val argStream = new FileInputStream(runFile)
    val (interactive, args) = ClientServer.parseArgs(argStream)
    argStream.close()

    var done = false
    val t = new Thread(() =>

      try {
        val stdout = new PrintStream(new ProxyOutputStream(currentOutErr, 0), true)
        val stderr = new PrintStream(new ProxyOutputStream(currentOutErr, 1), true)
        val (_, newStateCache) = sm.main0(
          args,
          sm.stateCache,
          interactive,
          () => !locks.clientLock.probe(),
          socketIn,
          stdout, stderr
        )

        sm.stateCache = newStateCache
      } catch{case WatchInterrupted(sc: Option[T]) =>
        sm.stateCache = sc
      } finally{
        done = true
      }
    )

    t.start()
    // We cannot simply use Lock#await here, because the filesystem doesn't
    // realize the clientLock/serverLock are held by different threads in the
    // two processes and gives a spurious deadlock error
    while(!done && !locks.clientLock.probe()) {
      Thread.sleep(3)
    }

    t.interrupt()
    t.stop()
    clientSocket.close()
  }
}
case class WatchInterrupted[T](stateCache: Option[T]) extends Exception