package dotty package tools package vulpix import java.io.{ File => JFile, InputStreamReader, BufferedReader, PrintStream } import java.util.concurrent.TimeoutException import scala.concurrent.duration.Duration import scala.concurrent.{ Await, Future } import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.mutable /** Vulpix spawns JVM subprocesses (`numberOfSlaves`) in order to run tests * without compromising the main JVM * * These need to be orchestrated in a safe manner with a simple protocol. This * interface provides just that. * * The protocol is defined as: * * - master sends classpath to for which to run `Test#main` and waits for * `maxDuration` * - slave invokes the method and waits until completion * - upon completion it sends back a `RunComplete` message * - the master checks if the child is still alive * - child is still alive, the output was valid * - child is dead, the output is the failure message * * If this whole chain of events is not completed within `maxDuration`, the * child process is destroyed and a new child is spawned. */ trait RunnerOrchestration { /** The maximum amount of active runners, which contain a child JVM */ def numberOfSlaves: Int /** The maximum duration the child process is allowed to consume before * getting destroyed */ def maxDuration: Duration /** Destroy and respawn process after each test */ def safeMode: Boolean /** Running a `Test` class's main method from the specified `dir` */ def runMain(classPath: String)(implicit summaryReport: SummaryReporting): Status = monitor.runMain(classPath) private[this] val monitor = new RunnerMonitor /** The runner monitor object keeps track of child JVM processes by keeping * them in two structures - one for free, and one for busy children. * * When a user calls `runMain` the monitor makes takes a free JVM and blocks * until the run is complete - or `maxDuration` has passed. It then performs * cleanup by returning the used JVM to the free list, or respawning it if * it died */ private class RunnerMonitor { def runMain(classPath: String)(implicit summaryReport: SummaryReporting): Status = withRunner(_.runMain(classPath)) private class Runner(private var process: Process) { private[this] var childStdout: BufferedReader = _ private[this] var childStdin: PrintStream = _ /** Checks if `process` is still alive * * When `process.exitValue()` is called on an active process the caught * exception is thrown. As such we can know if the subprocess exited or * not. */ def isAlive: Boolean = try { process.exitValue(); false } catch { case _: IllegalThreadStateException => true } /** Destroys the underlying process and kills IO streams */ def kill(): Unit = { if (process ne null) process.destroy() process = null childStdout = null childStdin = null } /** Did add hook to kill the child VMs? */ private[this] var didAddCleanupCallback = false /** Blocks less than `maxDuration` while running `Test.main` from `dir` */ def runMain(classPath: String)(implicit summaryReport: SummaryReporting): Status = { if (!didAddCleanupCallback) { // If for some reason the test runner (i.e. sbt) doesn't kill the VM, we // need to clean up ourselves. summaryReport.addCleanup(killAll) } assert(process ne null, "Runner was killed and then reused without setting a new process") // Makes the encapsulating RunnerMonitor spawn a new runner def respawn(): Unit = { process.destroy() process = createProcess childStdout = null childStdin = null } if (childStdin eq null) childStdin = new PrintStream(process.getOutputStream, /* autoFlush = */ true) // pass file to running process childStdin.println(classPath) // Create a future reading the object: val readOutput = Future { val sb = new StringBuilder if (childStdout eq null) childStdout = new BufferedReader(new InputStreamReader(process.getInputStream)) var childOutput = childStdout.readLine() while (childOutput != ChildJVMMain.MessageEnd && childOutput != null) { sb.append(childOutput) sb += '\n' childOutput = childStdout.readLine() } if (process.isAlive && childOutput != null) Success(sb.toString) else Failure(sb.toString) } // Await result for `maxDuration` and then timout and destroy the // process: val status = try Await.result(readOutput, maxDuration) catch { case _: TimeoutException => Timeout } // Handle failure of the VM: status match { case _: Success if safeMode => respawn() case _: Success => // no need to respawn sub process case _: Failure => respawn() case Timeout => respawn() } status } } /** Create a process which has the classpath of the `ChildJVMMain` and the * scala library. */ private def createProcess: Process = { val sep = sys.props("file.separator") val cp = classOf[ChildJVMMain].getProtectionDomain.getCodeSource.getLocation.getFile + ":" + Jars.scalaLibraryFromRuntime val javaBin = sys.props("java.home") + sep + "bin" + sep + "java" new ProcessBuilder(javaBin, "-cp", cp, "dotty.tools.vulpix.ChildJVMMain") .redirectErrorStream(true) .redirectInput(ProcessBuilder.Redirect.PIPE) .redirectOutput(ProcessBuilder.Redirect.PIPE) .start() } private[this] val allRunners = List.fill(numberOfSlaves)(new Runner(createProcess)) private[this] val freeRunners = mutable.Queue(allRunners: _*) private[this] val busyRunners = mutable.Set.empty[Runner] private def getRunner(): Runner = synchronized { while (freeRunners.isEmpty) wait() val runner = freeRunners.dequeue() busyRunners += runner notify() runner } private def freeRunner(runner: Runner): Unit = synchronized { freeRunners.enqueue(runner) busyRunners -= runner notify() } private def withRunner[T](op: Runner => T): T = { val runner = getRunner() val result = op(runner) freeRunner(runner) result } private def killAll(): Unit = allRunners.foreach(_.kill()) // On shutdown, we need to kill all runners: sys.addShutdownHook(killAll()) } }