From 2e7d7d45557474df61d54e672bedf07a8ff149d4 Mon Sep 17 00:00:00 2001 From: Paul Phillips Date: Wed, 12 Jan 2011 06:16:56 +0000 Subject: More fiddling with the process code. --- src/library/scala/sys/process/BasicIO.scala | 45 ++- src/library/scala/sys/process/Process.scala | 6 +- src/library/scala/sys/process/ProcessBuilder.scala | 2 +- .../scala/sys/process/ProcessBuilderImpl.scala | 22 +- src/library/scala/sys/process/ProcessIO.scala | 4 +- src/library/scala/sys/process/ProcessImpl.scala | 441 +++++++++++---------- src/library/scala/sys/process/package.scala | 11 +- 7 files changed, 286 insertions(+), 245 deletions(-) (limited to 'src') diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala index 505a859377..4f6a994b21 100644 --- a/src/library/scala/sys/process/BasicIO.scala +++ b/src/library/scala/sys/process/BasicIO.scala @@ -9,16 +9,46 @@ package scala.sys package process -import processAliases._ -import java.io.{ BufferedReader, InputStreamReader } +import processInternal._ +import java.io.{ BufferedReader, InputStreamReader, FilterInputStream, FilterOutputStream } +import java.util.concurrent.LinkedBlockingQueue object BasicIO { final val BufferSize = 8192 final val Newline = props("line.separator") - def apply(buffer: StringBuffer, log: Option[ProcessLogger], withIn: Boolean) = + private[process] final class Streamed[T]( + val process: T => Unit, + val done: Int => Unit, + val stream: () => Stream[T] + ) + + private[process] object Streamed { + def apply[T](nonzeroException: Boolean): Streamed[T] = { + val q = new LinkedBlockingQueue[Either[Int, T]] + def next(): Stream[T] = q.take match { + case Left(0) => Stream.empty + case Left(code) => if (nonzeroException) sys.error("Nonzero exit code: " + code) else Stream.empty + case Right(s) => Stream.cons(s, next) + } + new Streamed((s: T) => q put Right(s), code => q put Left(code), () => next()) + } + } + + private[process] object Uncloseable { + def apply(in: InputStream): InputStream = new FilterInputStream(in) { override def close() { } } + def apply(out: OutputStream): OutputStream = new FilterOutputStream(out) { override def close() { } } + def protect(in: InputStream): InputStream = if (in eq System.in) Uncloseable(in) else in + def protect(out: OutputStream): OutputStream = if ((out eq System.out) || (out eq System.err)) Uncloseable(out) else out + } + + def apply(withIn: Boolean, output: String => Unit, log: Option[ProcessLogger]) = + new ProcessIO(input(withIn), processFully(output), getErr(log)) + + def apply(withIn: Boolean, buffer: StringBuffer, log: Option[ProcessLogger]) = new ProcessIO(input(withIn), processFully(buffer), getErr(log)) - def apply(log: ProcessLogger, withIn: Boolean) = + + def apply(withIn: Boolean, log: ProcessLogger) = new ProcessIO(input(withIn), processInfoFully(log), processErrFully(log)) def getErr(log: Option[ProcessLogger]) = log match { @@ -58,11 +88,11 @@ object BasicIO { def transferFully(in: InputStream, out: OutputStream): Unit = try transferFullyImpl(in, out) - catch { case _: InterruptedException => () } + catch onInterrupt(()) private[this] def appendLine(buffer: Appendable): String => Unit = line => { - buffer.append(line) - buffer.append(Newline) + buffer append line + buffer append Newline } private[this] def transferFullyImpl(in: InputStream, out: OutputStream) { @@ -78,4 +108,3 @@ object BasicIO { loop() } } - diff --git a/src/library/scala/sys/process/Process.scala b/src/library/scala/sys/process/Process.scala index 6453002ae4..c64bb95fcf 100644 --- a/src/library/scala/sys/process/Process.scala +++ b/src/library/scala/sys/process/Process.scala @@ -9,7 +9,7 @@ package scala.sys package process -import processAliases._ +import processInternal._ import ProcessBuilder._ /** Represents a process that is running or has finished running. @@ -23,7 +23,7 @@ trait Process { } /** Methods for constructing simple commands that can then be combined. */ -object Process extends ProcessCreation { } +object Process extends ProcessImpl with ProcessCreation { } trait ProcessCreation { def apply(command: String): ProcessBuilder = apply(command, None) @@ -50,7 +50,7 @@ trait ProcessCreation { /** create ProcessBuilder with working dir optionaly set to File and extra environment variables */ def apply(command: Seq[String], cwd: Option[File], extraEnv: (String, String)*): ProcessBuilder = { - val jpb = new JProcessBuilder(command.toArray : _*) + val jpb = new JProcessBuilder(command.toArray: _*) cwd foreach (jpb directory _) extraEnv foreach { case (k, v) => jpb.environment.put(k, v) } apply(jpb) diff --git a/src/library/scala/sys/process/ProcessBuilder.scala b/src/library/scala/sys/process/ProcessBuilder.scala index 798796b29c..cef269aba2 100644 --- a/src/library/scala/sys/process/ProcessBuilder.scala +++ b/src/library/scala/sys/process/ProcessBuilder.scala @@ -9,7 +9,7 @@ package scala.sys package process -import processAliases._ +import processInternal._ import ProcessBuilder._ /** Represents a runnable process. */ diff --git a/src/library/scala/sys/process/ProcessBuilderImpl.scala b/src/library/scala/sys/process/ProcessBuilderImpl.scala index 3e6d41b5b6..8bb6627e9c 100644 --- a/src/library/scala/sys/process/ProcessBuilderImpl.scala +++ b/src/library/scala/sys/process/ProcessBuilderImpl.scala @@ -9,14 +9,16 @@ package scala.sys package process -import processAliases._ +import processInternal._ +import Process._ import java.io.{ FileInputStream, FileOutputStream } +import BasicIO.{ Uncloseable, Streamed } import Uncloseable.protect private[process] trait ProcessBuilderImpl { self: ProcessBuilder.type => - class Dummy(override val toString: String, exitValue: => Int) extends AbstractBuilder { + private[process] class Dummy(override val toString: String, exitValue: => Int) extends AbstractBuilder { override def run(io: ProcessIO): Process = new DummyProcess(exitValue) override def canPipeTo = true } @@ -83,10 +85,10 @@ private[process] trait ProcessBuilderImpl { def #&&(other: ProcessBuilder): ProcessBuilder = new AndBuilder(this, other) def ###(other: ProcessBuilder): ProcessBuilder = new SequenceBuilder(this, other) - def run(): Process = run(false) - def run(connectInput: Boolean): Process = run(BasicIO.standard(connectInput)) - def run(log: ProcessLogger): Process = run(log, false) - def run(log: ProcessLogger, connectInput: Boolean): Process = run(BasicIO(log, connectInput)) + def run(): Process = run(false) + def run(connectInput: Boolean): Process = run(BasicIO.standard(connectInput)) + def run(log: ProcessLogger): Process = run(log, false) + def run(log: ProcessLogger, connectInput: Boolean): Process = run(BasicIO(connectInput, log)) def !! = slurp(None, false) def !!(log: ProcessLogger) = slurp(Some(log), false) @@ -106,7 +108,7 @@ private[process] trait ProcessBuilderImpl { private[this] def slurp(log: Option[ProcessLogger], withIn: Boolean): String = { val buffer = new StringBuffer - val code = this ! BasicIO(buffer, log, withIn) + val code = this ! BasicIO(withIn, buffer, log) if (code == 0) buffer.toString else sys.error("Nonzero exit value: " + code) @@ -118,7 +120,7 @@ private[process] trait ProcessBuilderImpl { log: Option[ProcessLogger] ): Stream[String] = { val streamed = Streamed[String](nonZeroException) - val process = run(new ProcessIO(BasicIO.input(withInput), BasicIO.processFully(streamed.process), BasicIO.getErr(log))) + val process = run(BasicIO(withInput, streamed.process, log)) Spawn(streamed done process.exitValue()) streamed.stream() @@ -130,10 +132,10 @@ private[process] trait ProcessBuilderImpl { def canPipeTo = false } - class URLImpl(url: URL) extends URLBuilder with Source { + private[process] class URLImpl(url: URL) extends URLBuilder with Source { protected def toSource = new URLInput(url) } - class FileImpl(base: File) extends FileBuilder with Sink with Source { + private[process] class FileImpl(base: File) extends FileBuilder with Sink with Source { protected def toSource = new FileInput(base) protected def toSink = new FileOutput(base, false) diff --git a/src/library/scala/sys/process/ProcessIO.scala b/src/library/scala/sys/process/ProcessIO.scala index b9ac156044..db9c8b3823 100644 --- a/src/library/scala/sys/process/ProcessIO.scala +++ b/src/library/scala/sys/process/ProcessIO.scala @@ -9,7 +9,7 @@ package scala.sys package process -import processAliases._ +import processInternal._ /** Each method will be called in a separate thread.*/ final class ProcessIO( @@ -17,7 +17,7 @@ final class ProcessIO( val processOutput: InputStream => Unit, val processError: InputStream => Unit ) { + def withInput(write: OutputStream => Unit): ProcessIO = new ProcessIO(write, processOutput, processError) def withOutput(process: InputStream => Unit): ProcessIO = new ProcessIO(writeInput, process, processError) def withError(process: InputStream => Unit): ProcessIO = new ProcessIO(writeInput, processOutput, process) - def withInput(write: OutputStream => Unit): ProcessIO = new ProcessIO(write, processOutput, processError) } diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index aebce5ac94..23b0ffc266 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -9,224 +9,225 @@ package scala.sys package process -import processAliases._ -import java.io.{ FilterInputStream, FilterOutputStream, PipedInputStream, PipedOutputStream } -import java.util.concurrent.LinkedBlockingQueue - -/** Runs provided code in a new Thread and returns the Thread instance. */ -private object Spawn { - def apply(f: => Unit): Thread = apply(f, false) - def apply(f: => Unit, daemon: Boolean): Thread = { - val thread = new Thread() { override def run() = { f } } - thread.setDaemon(daemon) - thread.start() - thread - } +import processInternal._ +import java.io.{ PipedInputStream, PipedOutputStream } + +private[process] trait ProcessImpl { + self: Process.type => + + /** Runs provided code in a new Thread and returns the Thread instance. */ + private[process] object Spawn { + def apply(f: => Unit): Thread = apply(f, false) + def apply(f: => Unit, daemon: Boolean): Thread = { + val thread = new Thread() { override def run() = { f } } + thread.setDaemon(daemon) + thread.start() + thread + } + } + private[process] object Future { + def apply[T](f: => T): () => T = { + val result = new SyncVar[Either[Throwable, T]] + def run: Unit = + try result set Right(f) + catch { case e: Exception => result set Left(e) } + + Spawn(run) + + () => result.get match { + case Right(value) => value + case Left(exception) => throw exception + } + } + } + + private[process] class AndProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO + ) extends SequentialProcess(a, b, io, _ == 0) + + private[process] class OrProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO + ) extends SequentialProcess(a, b, io, _ != 0) + + private[process] class ProcessSequence( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO + ) extends SequentialProcess(a, b, io, _ => true) + + private[process] class SequentialProcess( + a: ProcessBuilder, + b: ProcessBuilder, + io: ProcessIO, + evaluateSecondProcess: Int => Boolean + ) extends CompoundProcess { + + protected[this] override def runAndExitValue() = { + val first = a.run(io) + runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA => + if (evaluateSecondProcess(codeA)) { + val second = b.run(io) + runInterruptible(second.exitValue)(second.destroy()) + } + else Some(codeA) + } + } + } + + private[process] abstract class BasicProcess extends Process { + def start(): Unit + } + + private[process] abstract class CompoundProcess extends BasicProcess { + def destroy() = destroyer() + def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.") + def start() = getExitValue + + protected lazy val (getExitValue, destroyer) = { + val code = new SyncVar[Option[Int]]() + code set None + val thread = Spawn(code set runAndExitValue()) + + ( + Future { thread.join(); code.get }, + () => thread.interrupt() + ) + } + + /** Start and block until the exit value is available and then return it in Some. Return None if destroyed (use 'run')*/ + protected[this] def runAndExitValue(): Option[Int] + + protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = { + try Some(action) + catch onInterrupt { destroyImpl; None } + } + } + + private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { + protected[this] override def runAndExitValue() = { + val currentSource = new SyncVar[Option[InputStream]] + val pipeOut = new PipedOutputStream + val source = new PipeSource(currentSource, pipeOut, a.toString) + source.start() + + val pipeIn = new PipedInputStream(pipeOut) + val currentSink = new SyncVar[Option[OutputStream]] + val sink = new PipeSink(pipeIn, currentSink, b.toString) + sink.start() + + def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + + val firstIO = + if (toError) + defaultIO.withError(handleOutOrError) + else + defaultIO.withOutput(handleOutOrError) + val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) + + val second = b.run(secondIO) + val first = a.run(firstIO) + try { + runInterruptible { + first.exitValue + currentSource put None + currentSink put None + second.exitValue + } { + first.destroy() + second.destroy() + } + } + finally { + BasicIO close pipeIn + BasicIO close pipeOut + } + } + } + + private[process] abstract class PipeThread(isSink: Boolean, labelFn: () => String) extends Thread { + def run(): Unit + + private[process] def runloop(src: InputStream, dst: OutputStream): Unit = { + try BasicIO.transferFully(src, dst) + catch ioFailure(ioHandler) + finally BasicIO close { + if (isSink) dst else src + } + } + private def ioHandler(e: IOException) { + println("I/O error " + e.getMessage + " for process: " + labelFn()) + e.printStackTrace() + } + } + + private[process] class PipeSource( + currentSource: SyncVar[Option[InputStream]], + pipe: PipedOutputStream, + label: => String + ) extends PipeThread(false, () => label) { + + final override def run(): Unit = currentSource.get match { + case Some(source) => + try runloop(source, pipe) + finally currentSource.unset() + + run() + case None => + currentSource.unset() + BasicIO close pipe + } + } + private[process] class PipeSink( + pipe: PipedInputStream, + currentSink: SyncVar[Option[OutputStream]], + label: => String + ) extends PipeThread(true, () => label) { + + final override def run(): Unit = currentSink.get match { + case Some(sink) => + try runloop(pipe, sink) + finally currentSink.unset() + + run() + case None => + currentSink.unset() + } + } + + /** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. + * The implementation of `exitValue` waits until these threads die before returning. */ + private[process] class DummyProcess(action: => Int) extends Process { + private[this] val exitCode = Future(action) + override def exitValue() = exitCode() + override def destroy() { } + } + /** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the + * output and error streams of the process. `inputThread` is the Thread created to write to the input stream of + * the process. + * The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before + * returning. */ + private[process] class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { + override def exitValue() = { + try p.waitFor() // wait for the process to terminate + finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate + outputThreads foreach (_.join()) // this ensures that all output is complete before returning (waitFor does not ensure this) + + p.exitValue() + } + override def destroy() = { + try p.destroy() + finally inputThread.interrupt() + } + } + private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { + override def exitValue() = { + thread.join() + if (success.get) 0 else 1 + } + override def destroy() { thread.interrupt() } + } } -private object Future { - def apply[T](f: => T): () => T = { - val result = new SyncVar[Either[Throwable, T]] - def run: Unit = - try result.set(Right(f)) - catch { case e: Exception => result set Left(e) } - - Spawn(run) - - () => result.get match { - case Right(value) => value - case Left(exception) => throw exception - } - } -} -object Uncloseable { - def apply(in: InputStream): InputStream = new FilterInputStream(in) { override def close() { } } - def apply(out: OutputStream): OutputStream = new FilterOutputStream(out) { override def close() { } } - def protect(in: InputStream): InputStream = if (in eq System.in) Uncloseable(in) else in - def protect(out: OutputStream): OutputStream = if ((out eq System.out) || (out eq System.err)) Uncloseable(out) else out -} - -private class AndProcess( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO -) extends SequentialProcess(a, b, io, _ == 0) - -private class OrProcess( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO -) extends SequentialProcess(a, b, io, _ != 0) - -private class ProcessSequence( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO -) extends SequentialProcess(a, b, io, _ => true) - -private class SequentialProcess( - a: ProcessBuilder, - b: ProcessBuilder, - io: ProcessIO, - evaluateSecondProcess: Int => Boolean -) extends CompoundProcess { - - protected[this] override def runAndExitValue() = { - val first = a.run(io) - runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA => - if (evaluateSecondProcess(codeA)) { - val second = b.run(io) - runInterruptible(second.exitValue)(second.destroy()) - } - else Some(codeA) - } - } -} - -private abstract class BasicProcess extends Process { - def start(): Unit -} - -private abstract class CompoundProcess extends BasicProcess { - def destroy() = destroyer() - def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.") - def start() = getExitValue - - protected lazy val (getExitValue, destroyer) = { - val code = new SyncVar[Option[Int]]() - code set None - val thread = Spawn(code.set(runAndExitValue())) - - ( - Future { thread.join(); code.get }, - () => thread.interrupt() - ) - } - - /** Start and block until the exit value is available and then return it in Some. Return None if destroyed (use 'run')*/ - protected[this] def runAndExitValue(): Option[Int] - - protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = { - try Some(action) - catch { case _: InterruptedException => destroyImpl; None } - } -} - -private class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { - protected[this] override def runAndExitValue() = { - val currentSource = new SyncVar[Option[InputStream]] - val pipeOut = new PipedOutputStream - val source = new PipeSource(currentSource, pipeOut, a.toString) - source.start() - - val pipeIn = new PipedInputStream(pipeOut) - val currentSink = new SyncVar[Option[OutputStream]] - val sink = new PipeSink(pipeIn, currentSink, b.toString) - sink.start() - - def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) - - val firstIO = - if (toError) - defaultIO.withError(handleOutOrError) - else - defaultIO.withOutput(handleOutOrError) - val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) - - val second = b.run(secondIO) - val first = a.run(firstIO) - try { - runInterruptible { - first.exitValue - currentSource put None - currentSink put None - val result = second.exitValue - result - } { - first.destroy() - second.destroy() - } - } - finally { - BasicIO.close(pipeIn) - BasicIO.close(pipeOut) - } - } -} -private class PipeSource(currentSource: SyncVar[Option[InputStream]], pipe: PipedOutputStream, label: => String) extends Thread { - final override def run() { - currentSource.get match { - case Some(source) => - try BasicIO.transferFully(source, pipe) - catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() } - finally { - BasicIO.close(source) - currentSource.unset() - } - run() - case None => - currentSource.unset() - BasicIO.close(pipe) - } - } -} -private class PipeSink(pipe: PipedInputStream, currentSink: SyncVar[Option[OutputStream]], label: => String) extends Thread { - final override def run() { - currentSink.get match { - case Some(sink) => - try BasicIO.transferFully(pipe, sink) - catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() } - finally { - BasicIO.close(sink) - currentSink.unset() - } - run() - case None => - currentSink.unset() - } - } -} -/** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. -* The implementation of `exitValue` waits until these threads die before returning. */ -private class DummyProcess(action: => Int) extends Process { - private[this] val exitCode = Future(action) - override def exitValue() = exitCode() - override def destroy() { } -} -/** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the -* output and error streams of the process. `inputThread` is the Thread created to write to the input stream of -* the process. -* The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before -* returning. */ -private class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { - override def exitValue() = { - try p.waitFor() // wait for the process to terminate - finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate - - outputThreads.foreach(_.join()) // this ensures that all output is complete before returning (waitFor does not ensure this) - p.exitValue() - } - override def destroy() = { - try p.destroy() - finally { inputThread.interrupt() } - } -} -private final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { - override def exitValue() = { - thread.join() - if (success.get) 0 else 1 - } - override def destroy() { thread.interrupt() } -} - -private object Streamed { - def apply[T](nonzeroException: Boolean): Streamed[T] = { - val q = new LinkedBlockingQueue[Either[Int, T]] - def next(): Stream[T] = q.take match { - case Left(0) => Stream.empty - case Left(code) => if (nonzeroException) error("Nonzero exit code: " + code) else Stream.empty - case Right(s) => Stream.cons(s, next) - } - new Streamed((s: T) => q.put(Right(s)), code => q.put(Left(code)), () => next()) - } -} -private final class Streamed[T](val process: T => Unit, val done: Int => Unit, val stream: () => Stream[T]) extends NotNull \ No newline at end of file diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala index 659c883d39..ae501841fa 100644 --- a/src/library/scala/sys/process/package.scala +++ b/src/library/scala/sys/process/package.scala @@ -11,7 +11,8 @@ package scala.sys package object process extends ProcessImplicits { // These are in a nested object instead of at the package level // due to the issues described in tickets #3160 and #3836. - private[process] object processAliases { + private[process] object processInternal { + type =?>[-A, +B] = PartialFunction[A, B] type Closeable = java.io.Closeable type File = java.io.File type IOException = java.io.IOException @@ -21,5 +22,13 @@ package object process extends ProcessImplicits { type OutputStream = java.io.OutputStream type SyncVar[T] = scala.concurrent.SyncVar[T] type URL = java.net.URL + + def onInterrupt[T](handler: => T): Throwable =?> T = { + case _: InterruptedException => handler + } + + def ioFailure[T](handler: IOException => T): Throwable =?> T = { + case e: IOException => handler(e) + } } } -- cgit v1.2.3