diff options
author | Paul Phillips <paulp@improving.org> | 2011-01-12 06:16:56 +0000 |
---|---|---|
committer | Paul Phillips <paulp@improving.org> | 2011-01-12 06:16:56 +0000 |
commit | 2e7d7d45557474df61d54e672bedf07a8ff149d4 (patch) | |
tree | fb218a2c7a37c4326525e32134990dd9756c5fc4 /src/library/scala/sys/process/ProcessImpl.scala | |
parent | 5bada810b4c7eda186aa40b94a78326520b3fa92 (diff) | |
download | scala-2e7d7d45557474df61d54e672bedf07a8ff149d4.tar.gz scala-2e7d7d45557474df61d54e672bedf07a8ff149d4.tar.bz2 scala-2e7d7d45557474df61d54e672bedf07a8ff149d4.zip |
More fiddling with the process code.
Diffstat (limited to 'src/library/scala/sys/process/ProcessImpl.scala')
-rw-r--r-- | src/library/scala/sys/process/ProcessImpl.scala | 441 |
1 files changed, 221 insertions, 220 deletions
diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index aebce5ac94..23b0ffc266 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -9,224 +9,225 @@ package scala.sys package process -import processAliases._ -import java.io.{ FilterInputStream, FilterOutputStream, PipedInputStream, PipedOutputStream } -import java.util.concurrent.LinkedBlockingQueue - -/** Runs provided code in a new Thread and returns the Thread instance. */ -private object Spawn { - def apply(f: => Unit): Thread = apply(f, false) - def apply(f: => Unit, daemon: Boolean): Thread = { - val thread = new Thread() { override def run() = { f } } - thread.setDaemon(daemon) - thread.start() - thread - } +import processInternal._ +import java.io.{ PipedInputStream, PipedOutputStream } + +private[process] trait ProcessImpl { + self: Process.type => + + /** Runs provided code in a new Thread and returns the Thread instance. */ + private[process] object Spawn { + def apply(f: => Unit): Thread = apply(f, false) + def apply(f: => Unit, daemon: Boolean): Thread = { + val thread = new Thread() { override def run() = { f } } + thread.setDaemon(daemon) + thread.start() + thread + } + } + private[process] object Future { + def apply[T](f: => T): () => T = { + val result = new SyncVar[Either[Throwable, T]] + def run: Unit = + try result set Right(f) + catch { case e: Exception => result set Left(e) } + + Spawn(run) + + () => result.get match { + case Right(value) => value + case Left(exception) => throw exception + } + } + } + + private[process] class AndProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO + ) extends SequentialProcess(a, b, io, _ == 0) + + private[process] class OrProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO + ) extends SequentialProcess(a, b, io, _ != 0) + + private[process] class ProcessSequence( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO + ) extends SequentialProcess(a, b, io, _ => true) + + private[process] class SequentialProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO, + evaluateSecondProcess: Int => Boolean + ) extends CompoundProcess { + + protected[this] override def runAndExitValue() = { + val first = a.run(io) + runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA => + if (evaluateSecondProcess(codeA)) { + val second = b.run(io) + runInterruptible(second.exitValue)(second.destroy()) + } + else Some(codeA) + } + } + } + + private[process] abstract class BasicProcess extends Process { + def start(): Unit + } + + private[process] abstract class CompoundProcess extends BasicProcess { + def destroy() = destroyer() + def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.") + def start() = getExitValue + + protected lazy val (getExitValue, destroyer) = { + val code = new SyncVar[Option[Int]]() + code set None + val thread = Spawn(code set runAndExitValue()) + + ( + Future { thread.join(); code.get }, + () => thread.interrupt() + ) + } + + /** Start and block until the exit value is available and then return it in Some. Return None if destroyed (use 'run')*/ + protected[this] def runAndExitValue(): Option[Int] + + protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = { + try Some(action) + catch onInterrupt { destroyImpl; None } + } + } + + private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { + protected[this] override def runAndExitValue() = { + val currentSource = new SyncVar[Option[InputStream]] + val pipeOut = new PipedOutputStream + val source = new PipeSource(currentSource, pipeOut, a.toString) + source.start() + + val pipeIn = new PipedInputStream(pipeOut) + val currentSink = new SyncVar[Option[OutputStream]] + val sink = new PipeSink(pipeIn, currentSink, b.toString) + sink.start() + + def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + + val firstIO = + if (toError) + defaultIO.withError(handleOutOrError) + else + defaultIO.withOutput(handleOutOrError) + val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) + + val second = b.run(secondIO) + val first = a.run(firstIO) + try { + runInterruptible { + first.exitValue + currentSource put None + currentSink put None + second.exitValue + } { + first.destroy() + second.destroy() + } + } + finally { + BasicIO close pipeIn + BasicIO close pipeOut + } + } + } + + private[process] abstract class PipeThread(isSink: Boolean, labelFn: () => String) extends Thread { + def run(): Unit + + private[process] def runloop(src: InputStream, dst: OutputStream): Unit = { + try BasicIO.transferFully(src, dst) + catch ioFailure(ioHandler) + finally BasicIO close { + if (isSink) dst else src + } + } + private def ioHandler(e: IOException) { + println("I/O error " + e.getMessage + " for process: " + labelFn()) + e.printStackTrace() + } + } + + private[process] class PipeSource( + currentSource: SyncVar[Option[InputStream]], + pipe: PipedOutputStream, + label: => String + ) extends PipeThread(false, () => label) { + + final override def run(): Unit = currentSource.get match { + case Some(source) => + try runloop(source, pipe) + finally currentSource.unset() + + run() + case None => + currentSource.unset() + BasicIO close pipe + } + } + private[process] class PipeSink( + pipe: PipedInputStream, + currentSink: SyncVar[Option[OutputStream]], + label: => String + ) extends PipeThread(true, () => label) { + + final override def run(): Unit = currentSink.get match { + case Some(sink) => + try runloop(pipe, sink) + finally currentSink.unset() + + run() + case None => + currentSink.unset() + } + } + + /** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. + * The implementation of `exitValue` waits until these threads die before returning. */ + private[process] class DummyProcess(action: => Int) extends Process { + private[this] val exitCode = Future(action) + override def exitValue() = exitCode() + override def destroy() { } + } + /** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the + * output and error streams of the process. `inputThread` is the Thread created to write to the input stream of + * the process. + * The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before + * returning. */ + private[process] class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { + override def exitValue() = { + try p.waitFor() // wait for the process to terminate + finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate + outputThreads foreach (_.join()) // this ensures that all output is complete before returning (waitFor does not ensure this) + + p.exitValue() + } + override def destroy() = { + try p.destroy() + finally inputThread.interrupt() + } + } + private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { + override def exitValue() = { + thread.join() + if (success.get) 0 else 1 + } + override def destroy() { thread.interrupt() } + } } -private object Future { - def apply[T](f: => T): () => T = { - val result = new SyncVar[Either[Throwable, T]] - def run: Unit = - try result.set(Right(f)) - catch { case e: Exception => result set Left(e) } - - Spawn(run) - - () => result.get match { - case Right(value) => value - case Left(exception) => throw exception - } - } -} -object Uncloseable { - def apply(in: InputStream): InputStream = new FilterInputStream(in) { override def close() { } } - def apply(out: OutputStream): OutputStream = new FilterOutputStream(out) { override def close() { } } - def protect(in: InputStream): InputStream = if (in eq System.in) Uncloseable(in) else in - def protect(out: OutputStream): OutputStream = if ((out eq System.out) || (out eq System.err)) Uncloseable(out) else out -} - -private class AndProcess( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO -) extends SequentialProcess(a, b, io, _ == 0) - -private class OrProcess( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO -) extends SequentialProcess(a, b, io, _ != 0) - -private class ProcessSequence( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO -) extends SequentialProcess(a, b, io, _ => true) - -private class SequentialProcess( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO, - evaluateSecondProcess: Int => Boolean -) extends CompoundProcess { - - protected[this] override def runAndExitValue() = { - val first = a.run(io) - runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA => - if (evaluateSecondProcess(codeA)) { - val second = b.run(io) - runInterruptible(second.exitValue)(second.destroy()) - } - else Some(codeA) - } - } -} - -private abstract class BasicProcess extends Process { - def start(): Unit -} - -private abstract class CompoundProcess extends BasicProcess { - def destroy() = destroyer() - def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.") - def start() = getExitValue - - protected lazy val (getExitValue, destroyer) = { - val code = new SyncVar[Option[Int]]() - code set None - val thread = Spawn(code.set(runAndExitValue())) - - ( - Future { thread.join(); code.get }, - () => thread.interrupt() - ) - } - - /** Start and block until the exit value is available and then return it in Some. Return None if destroyed (use 'run')*/ - protected[this] def runAndExitValue(): Option[Int] - - protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = { - try Some(action) - catch { case _: InterruptedException => destroyImpl; None } - } -} - -private class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { - protected[this] override def runAndExitValue() = { - val currentSource = new SyncVar[Option[InputStream]] - val pipeOut = new PipedOutputStream - val source = new PipeSource(currentSource, pipeOut, a.toString) - source.start() - - val pipeIn = new PipedInputStream(pipeOut) - val currentSink = new SyncVar[Option[OutputStream]] - val sink = new PipeSink(pipeIn, currentSink, b.toString) - sink.start() - - def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) - - val firstIO = - if (toError) - defaultIO.withError(handleOutOrError) - else - defaultIO.withOutput(handleOutOrError) - val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) - - val second = b.run(secondIO) - val first = a.run(firstIO) - try { - runInterruptible { - first.exitValue - currentSource put None - currentSink put None - val result = second.exitValue - result - } { - first.destroy() - second.destroy() - } - } - finally { - BasicIO.close(pipeIn) - BasicIO.close(pipeOut) - } - } -} -private class PipeSource(currentSource: SyncVar[Option[InputStream]], pipe: PipedOutputStream, label: => String) extends Thread { - final override def run() { - currentSource.get match { - case Some(source) => - try BasicIO.transferFully(source, pipe) - catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() } - finally { - BasicIO.close(source) - currentSource.unset() - } - run() - case None => - currentSource.unset() - BasicIO.close(pipe) - } - } -} -private class PipeSink(pipe: PipedInputStream, currentSink: SyncVar[Option[OutputStream]], label: => String) extends Thread { - final override def run() { - currentSink.get match { - case Some(sink) => - try BasicIO.transferFully(pipe, sink) - catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() } - finally { - BasicIO.close(sink) - currentSink.unset() - } - run() - case None => - currentSink.unset() - } - } -} -/** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. -* The implementation of `exitValue` waits until these threads die before returning. */ -private class DummyProcess(action: => Int) extends Process { - private[this] val exitCode = Future(action) - override def exitValue() = exitCode() - override def destroy() { } -} -/** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the -* output and error streams of the process. `inputThread` is the Thread created to write to the input stream of -* the process. -* The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before -* returning. */ -private class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { - override def exitValue() = { - try p.waitFor() // wait for the process to terminate - finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate - - outputThreads.foreach(_.join()) // this ensures that all output is complete before returning (waitFor does not ensure this) - p.exitValue() - } - override def destroy() = { - try p.destroy() - finally { inputThread.interrupt() } - } -} -private final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { - override def exitValue() = { - thread.join() - if (success.get) 0 else 1 - } - override def destroy() { thread.interrupt() } -} - -private object Streamed { - def apply[T](nonzeroException: Boolean): Streamed[T] = { - val q = new LinkedBlockingQueue[Either[Int, T]] - def next(): Stream[T] = q.take match { - case Left(0) => Stream.empty - case Left(code) => if (nonzeroException) error("Nonzero exit code: " + code) else Stream.empty - case Right(s) => Stream.cons(s, next) - } - new Streamed((s: T) => q.put(Right(s)), code => q.put(Left(code)), () => next()) - } -} -private final class Streamed[T](val process: T => Unit, val done: Int => Unit, val stream: () => Stream[T]) extends NotNull
\ No newline at end of file |