diff options
author | Paul Phillips <paulp@improving.org> | 2011-01-12 02:49:08 +0000 |
---|---|---|
committer | Paul Phillips <paulp@improving.org> | 2011-01-12 02:49:08 +0000 |
commit | 5bada810b4c7eda186aa40b94a78326520b3fa92 (patch) | |
tree | eee2227bf7f53a6b04e9732d887961b403975ab5 /src/library/scala/sys/process/ProcessImpl.scala | |
parent | 566fefb05abe31e90f765d1fb0a89b264302d9ce (diff) | |
download | scala-5bada810b4c7eda186aa40b94a78326520b3fa92.tar.gz scala-5bada810b4c7eda186aa40b94a78326520b3fa92.tar.bz2 scala-5bada810b4c7eda186aa40b94a78326520b3fa92.zip |
Imported sbt.Process into trunk, in the guise o...
Imported sbt.Process into trunk, in the guise of package
scala.sys.process. It is largely indistinguishable from the version in
sbt, at least from the outside.
Also, I renamed package system to sys. I wanted to do that from the
beginning and the desire has only grown since then. Sometimes a short
identifier is just critical to usability: with a function like error("")
called from hundreds of places, the difference between system.error and
sys.error is too big. sys.error and sys.exit have good vibes (at least
as good as the vibes can be for functions which error and exit.)
Note: this is just the first cut. I need to check this in to finish
fixing partest. I will be going over it with a comb and writing
documentation which will leave you enchanted, as well as removing other
bits which are now redundant or inferior. No review.
Diffstat (limited to 'src/library/scala/sys/process/ProcessImpl.scala')
-rw-r--r-- | src/library/scala/sys/process/ProcessImpl.scala | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala new file mode 100644 index 0000000000..aebce5ac94 --- /dev/null +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -0,0 +1,232 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2010, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.sys +package process + +import processAliases._ +import java.io.{ FilterInputStream, FilterOutputStream, PipedInputStream, PipedOutputStream } +import java.util.concurrent.LinkedBlockingQueue + +/** Runs provided code in a new Thread and returns the Thread instance. */ +private object Spawn { + def apply(f: => Unit): Thread = apply(f, false) + def apply(f: => Unit, daemon: Boolean): Thread = { + val thread = new Thread() { override def run() = { f } } + thread.setDaemon(daemon) + thread.start() + thread + } +} +private object Future { + def apply[T](f: => T): () => T = { + val result = new SyncVar[Either[Throwable, T]] + def run: Unit = + try result.set(Right(f)) + catch { case e: Exception => result set Left(e) } + + Spawn(run) + + () => result.get match { + case Right(value) => value + case Left(exception) => throw exception + } + } +} +object Uncloseable { + def apply(in: InputStream): InputStream = new FilterInputStream(in) { override def close() { } } + def apply(out: OutputStream): OutputStream = new FilterOutputStream(out) { override def close() { } } + def protect(in: InputStream): InputStream = if (in eq System.in) Uncloseable(in) else in + def protect(out: OutputStream): OutputStream = if ((out eq System.out) || (out eq System.err)) Uncloseable(out) else out +} + +private class AndProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO +) extends SequentialProcess(a, b, io, _ == 0) + +private class OrProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO +) extends SequentialProcess(a, b, io, _ != 0) + +private class ProcessSequence( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO +) extends SequentialProcess(a, b, io, _ => true) + +private class SequentialProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO, + evaluateSecondProcess: Int => Boolean +) extends CompoundProcess { + + protected[this] override def runAndExitValue() = { + val first = a.run(io) + runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA => + if (evaluateSecondProcess(codeA)) { + val second = b.run(io) + runInterruptible(second.exitValue)(second.destroy()) + } + else Some(codeA) + } + } +} + +private abstract class BasicProcess extends Process { + def start(): Unit +} + +private abstract class CompoundProcess extends BasicProcess { + def destroy() = destroyer() + def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.") + def start() = getExitValue + + protected lazy val (getExitValue, destroyer) = { + val code = new SyncVar[Option[Int]]() + code set None + val thread = Spawn(code.set(runAndExitValue())) + + ( + Future { thread.join(); code.get }, + () => thread.interrupt() + ) + } + + /** Start and block until the exit value is available and then return it in Some. Return None if destroyed (use 'run')*/ + protected[this] def runAndExitValue(): Option[Int] + + protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = { + try Some(action) + catch { case _: InterruptedException => destroyImpl; None } + } +} + +private class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { + protected[this] override def runAndExitValue() = { + val currentSource = new SyncVar[Option[InputStream]] + val pipeOut = new PipedOutputStream + val source = new PipeSource(currentSource, pipeOut, a.toString) + source.start() + + val pipeIn = new PipedInputStream(pipeOut) + val currentSink = new SyncVar[Option[OutputStream]] + val sink = new PipeSink(pipeIn, currentSink, b.toString) + sink.start() + + def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + + val firstIO = + if (toError) + defaultIO.withError(handleOutOrError) + else + defaultIO.withOutput(handleOutOrError) + val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) + + val second = b.run(secondIO) + val first = a.run(firstIO) + try { + runInterruptible { + first.exitValue + currentSource put None + currentSink put None + val result = second.exitValue + result + } { + first.destroy() + second.destroy() + } + } + finally { + BasicIO.close(pipeIn) + BasicIO.close(pipeOut) + } + } +} +private class PipeSource(currentSource: SyncVar[Option[InputStream]], pipe: PipedOutputStream, label: => String) extends Thread { + final override def run() { + currentSource.get match { + case Some(source) => + try BasicIO.transferFully(source, pipe) + catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() } + finally { + BasicIO.close(source) + currentSource.unset() + } + run() + case None => + currentSource.unset() + BasicIO.close(pipe) + } + } +} +private class PipeSink(pipe: PipedInputStream, currentSink: SyncVar[Option[OutputStream]], label: => String) extends Thread { + final override def run() { + currentSink.get match { + case Some(sink) => + try BasicIO.transferFully(pipe, sink) + catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() } + finally { + BasicIO.close(sink) + currentSink.unset() + } + run() + case None => + currentSink.unset() + } + } +} +/** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. +* The implementation of `exitValue` waits until these threads die before returning. */ +private class DummyProcess(action: => Int) extends Process { + private[this] val exitCode = Future(action) + override def exitValue() = exitCode() + override def destroy() { } +} +/** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the +* output and error streams of the process. `inputThread` is the Thread created to write to the input stream of +* the process. +* The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before +* returning. */ +private class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { + override def exitValue() = { + try p.waitFor() // wait for the process to terminate + finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate + + outputThreads.foreach(_.join()) // this ensures that all output is complete before returning (waitFor does not ensure this) + p.exitValue() + } + override def destroy() = { + try p.destroy() + finally { inputThread.interrupt() } + } +} +private final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { + override def exitValue() = { + thread.join() + if (success.get) 0 else 1 + } + override def destroy() { thread.interrupt() } +} + +private object Streamed { + def apply[T](nonzeroException: Boolean): Streamed[T] = { + val q = new LinkedBlockingQueue[Either[Int, T]] + def next(): Stream[T] = q.take match { + case Left(0) => Stream.empty + case Left(code) => if (nonzeroException) error("Nonzero exit code: " + code) else Stream.empty + case Right(s) => Stream.cons(s, next) + } + new Streamed((s: T) => q.put(Right(s)), code => q.put(Left(code)), () => next()) + } +} +private final class Streamed[T](val process: T => Unit, val done: Int => Unit, val stream: () => Stream[T]) extends NotNull
\ No newline at end of file |