diff options
Diffstat (limited to 'compiler/test/dotty/tools/vulpix/RunnerOrchestration.scala')
-rw-r--r-- | compiler/test/dotty/tools/vulpix/RunnerOrchestration.scala | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/compiler/test/dotty/tools/vulpix/RunnerOrchestration.scala b/compiler/test/dotty/tools/vulpix/RunnerOrchestration.scala new file mode 100644 index 000000000..ad068e9ef --- /dev/null +++ b/compiler/test/dotty/tools/vulpix/RunnerOrchestration.scala @@ -0,0 +1,196 @@ +package dotty +package tools +package vulpix + +import java.io.{ File => JFile, InputStreamReader, BufferedReader, PrintStream } +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration.Duration +import scala.concurrent.{ Await, Future } +import scala.concurrent.ExecutionContext.Implicits.global +import scala.collection.mutable + +/** Vulpix spawns JVM subprocesses (`numberOfSlaves`) in order to run tests + * without compromising the main JVM + * + * These need to be orchestrated in a safe manner with a simple protocol. This + * interface provides just that. + * + * The protocol is defined as: + * + * - master sends classpath to for which to run `Test#main` and waits for + * `maxDuration` + * - slave invokes the method and waits until completion + * - upon completion it sends back a `RunComplete` message + * - the master checks if the child is still alive + * - child is still alive, the output was valid + * - child is dead, the output is the failure message + * + * If this whole chain of events is not completed within `maxDuration`, the + * child process is destroyed and a new child is spawned. + */ +trait RunnerOrchestration { + + /** The maximum amount of active runners, which contain a child JVM */ + def numberOfSlaves: Int + + /** The maximum duration the child process is allowed to consume before + * getting destroyed + */ + def maxDuration: Duration + + /** Destroy and respawn process after each test */ + def safeMode: Boolean + + /** Running a `Test` class's main method from the specified `dir` */ + def runMain(classPath: String)(implicit summaryReport: SummaryReporting): Status = + monitor.runMain(classPath) + + private[this] val monitor = new RunnerMonitor + + /** The runner monitor object keeps track of child JVM processes by keeping + * them in two structures - one for free, and one for busy children. + * + * When a user calls `runMain` the monitor makes takes a free JVM and blocks + * until the run is complete - or `maxDuration` has passed. It then performs + * cleanup by returning the used JVM to the free list, or respawning it if + * it died + */ + private class RunnerMonitor { + + def runMain(classPath: String)(implicit summaryReport: SummaryReporting): Status = + withRunner(_.runMain(classPath)) + + private class Runner(private var process: Process) { + private[this] var childStdout: BufferedReader = _ + private[this] var childStdin: PrintStream = _ + + /** Checks if `process` is still alive + * + * When `process.exitValue()` is called on an active process the caught + * exception is thrown. As such we can know if the subprocess exited or + * not. + */ + def isAlive: Boolean = + try { process.exitValue(); false } + catch { case _: IllegalThreadStateException => true } + + /** Destroys the underlying process and kills IO streams */ + def kill(): Unit = { + if (process ne null) process.destroy() + process = null + childStdout = null + childStdin = null + } + + /** Did add hook to kill the child VMs? */ + private[this] var didAddCleanupCallback = false + + /** Blocks less than `maxDuration` while running `Test.main` from `dir` */ + def runMain(classPath: String)(implicit summaryReport: SummaryReporting): Status = { + if (!didAddCleanupCallback) { + // If for some reason the test runner (i.e. sbt) doesn't kill the VM, we + // need to clean up ourselves. + summaryReport.addCleanup(killAll) + } + assert(process ne null, + "Runner was killed and then reused without setting a new process") + + // Makes the encapsulating RunnerMonitor spawn a new runner + def respawn(): Unit = { + process.destroy() + process = createProcess + childStdout = null + childStdin = null + } + + if (childStdin eq null) + childStdin = new PrintStream(process.getOutputStream, /* autoFlush = */ true) + + // pass file to running process + childStdin.println(classPath) + + // Create a future reading the object: + val readOutput = Future { + val sb = new StringBuilder + + if (childStdout eq null) + childStdout = new BufferedReader(new InputStreamReader(process.getInputStream)) + + var childOutput = childStdout.readLine() + while (childOutput != ChildJVMMain.MessageEnd && childOutput != null) { + sb.append(childOutput) + sb += '\n' + childOutput = childStdout.readLine() + } + + if (process.isAlive && childOutput != null) Success(sb.toString) + else Failure(sb.toString) + } + + // Await result for `maxDuration` and then timout and destroy the + // process: + val status = + try Await.result(readOutput, maxDuration) + catch { case _: TimeoutException => Timeout } + + // Handle failure of the VM: + status match { + case _: Success if safeMode => respawn() + case _: Success => // no need to respawn sub process + case _: Failure => respawn() + case Timeout => respawn() + } + status + } + } + + /** Create a process which has the classpath of the `ChildJVMMain` and the + * scala library. + */ + private def createProcess: Process = { + val sep = sys.props("file.separator") + val cp = + classOf[ChildJVMMain].getProtectionDomain.getCodeSource.getLocation.getFile + ":" + + Jars.scalaLibraryFromRuntime + val javaBin = sys.props("java.home") + sep + "bin" + sep + "java" + new ProcessBuilder(javaBin, "-cp", cp, "dotty.tools.vulpix.ChildJVMMain") + .redirectErrorStream(true) + .redirectInput(ProcessBuilder.Redirect.PIPE) + .redirectOutput(ProcessBuilder.Redirect.PIPE) + .start() + } + + private[this] val allRunners = List.fill(numberOfSlaves)(new Runner(createProcess)) + private[this] val freeRunners = mutable.Queue(allRunners: _*) + private[this] val busyRunners = mutable.Set.empty[Runner] + + private def getRunner(): Runner = synchronized { + while (freeRunners.isEmpty) wait() + + val runner = freeRunners.dequeue() + busyRunners += runner + + notify() + runner + } + + private def freeRunner(runner: Runner): Unit = synchronized { + freeRunners.enqueue(runner) + busyRunners -= runner + notify() + } + + private def withRunner[T](op: Runner => T): T = { + val runner = getRunner() + val result = op(runner) + freeRunner(runner) + result + } + + private def killAll(): Unit = allRunners.foreach(_.kill()) + + // On shutdown, we need to kill all runners: + sys.addShutdownHook(killAll()) + } +} |