diff options
Diffstat (limited to 'src/library/scala/sys/process')
-rw-r--r-- | src/library/scala/sys/process/BasicIO.scala | 4 | ||||
-rw-r--r-- | src/library/scala/sys/process/Process.scala | 4 | ||||
-rw-r--r-- | src/library/scala/sys/process/ProcessBuilder.scala | 29 | ||||
-rw-r--r-- | src/library/scala/sys/process/ProcessBuilderImpl.scala | 14 | ||||
-rw-r--r-- | src/library/scala/sys/process/ProcessImpl.scala | 181 | ||||
-rw-r--r-- | src/library/scala/sys/process/package.scala | 38 |
6 files changed, 149 insertions, 121 deletions
diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala index 066b2f5373..b39ae77c62 100644 --- a/src/library/scala/sys/process/BasicIO.scala +++ b/src/library/scala/sys/process/BasicIO.scala @@ -33,7 +33,7 @@ object BasicIO { final val BufferSize = 8192 /** Used to separate lines in the `processFully` function that takes `Appendable`. */ - final val Newline = props("line.separator") + final val Newline = System.lineSeparator private[process] final class Streamed[T]( val process: T => Unit, @@ -221,7 +221,7 @@ object BasicIO { */ def transferFully(in: InputStream, out: OutputStream): Unit = try transferFullyImpl(in, out) - catch onInterrupt(()) + catch onIOInterrupt(()) private[this] def appendLine(buffer: Appendable): String => Unit = line => { buffer append line diff --git a/src/library/scala/sys/process/Process.scala b/src/library/scala/sys/process/Process.scala index 06b9967908..0ec749e78a 100644 --- a/src/library/scala/sys/process/Process.scala +++ b/src/library/scala/sys/process/Process.scala @@ -26,11 +26,11 @@ import scala.language.implicitConversions * make it possible for one to block until the process exits and get the exit value, * or destroy the process altogether. * - * Presently, one cannot poll the `Process` to see if it has finished. - * * @see [[scala.sys.process.ProcessBuilder]] */ trait Process { + /** Returns this process alive status */ + def isAlive(): Boolean /** Blocks until this process exits and returns the exit code.*/ def exitValue(): Int /** Destroys this process. */ diff --git a/src/library/scala/sys/process/ProcessBuilder.scala b/src/library/scala/sys/process/ProcessBuilder.scala index e4344a857e..d0745e5833 100644 --- a/src/library/scala/sys/process/ProcessBuilder.scala +++ b/src/library/scala/sys/process/ProcessBuilder.scala @@ -15,8 +15,8 @@ import ProcessBuilder._ /** Represents a sequence of one or more external processes that can be * executed. A `ProcessBuilder` can be a single external process, or a - * combination of other `ProcessBuilder`. One can control where a - * the output of an external process will go to, and where its input will come + * combination of other `ProcessBuilder`. One can control where the + * output of an external process will go to, and where its input will come * from, or leave that decision to whoever starts it. * * One creates a `ProcessBuilder` through factories provided in @@ -172,9 +172,9 @@ trait ProcessBuilder extends Source with Sink { * and then throw an exception. */ def lineStream: Stream[String] - + /** Deprecated (renamed). Use `lineStream` instead. */ - @deprecated("Use lineStream instead.", "2.11.0") + @deprecated("use lineStream instead", "2.11.0") def lines: Stream[String] = lineStream /** Starts the process represented by this builder. The output is returned as @@ -184,9 +184,9 @@ trait ProcessBuilder extends Source with Sink { * to termination and then throw an exception. */ def lineStream(log: ProcessLogger): Stream[String] - + /** Deprecated (renamed). Use `lineStream(log: ProcessLogger)` instead. */ - @deprecated("Use stream instead.", "2.11.0") + @deprecated("use lineStream instead", "2.11.0") def lines(log: ProcessLogger): Stream[String] = lineStream(log) /** Starts the process represented by this builder. The output is returned as @@ -196,9 +196,9 @@ trait ProcessBuilder extends Source with Sink { * but will not throw an exception. */ def lineStream_! : Stream[String] - + /** Deprecated (renamed). Use `lineStream_!` instead. */ - @deprecated("Use lineStream_! instead.", "2.11.0") + @deprecated("use lineStream_! instead", "2.11.0") def lines_! : Stream[String] = lineStream_! /** Starts the process represented by this builder. The output is returned as @@ -208,9 +208,9 @@ trait ProcessBuilder extends Source with Sink { * to termination but will not throw an exception. */ def lineStream_!(log: ProcessLogger): Stream[String] - + /** Deprecated (renamed). Use `lineStream_!(log: ProcessLogger)` instead. */ - @deprecated("Use stream_! instead.", "2.11.0") + @deprecated("use lineStream_! instead", "2.11.0") def lines_!(log: ProcessLogger): Stream[String] = lineStream_!(log) /** Starts the process represented by this builder, blocks until it exits, and @@ -257,10 +257,9 @@ trait ProcessBuilder extends Source with Sink { */ def run(connectInput: Boolean): Process - /** Starts the process represented by this builder, blocks until it exits, and - * returns the exit code. Standard output and error are sent to the given - * ProcessLogger. The newly started process reads from standard input of the - * current process if `connectInput` is true. + /** Starts the process represented by this builder. Standard output and error + * are sent to the given ProcessLogger. The newly started process reads from + * standard input of the current process if `connectInput` is true. */ def run(log: ProcessLogger, connectInput: Boolean): Process @@ -342,7 +341,7 @@ object ProcessBuilder extends ProcessBuilderImpl { /** Writes the output stream of this process to a [[scala.sys.process.ProcessBuilder]]. */ def #>(b: ProcessBuilder): ProcessBuilder = new PipedBuilder(toSource, b, false) - /** Returnes a [[scala.sys.process.ProcessBuilder]] representing this `Source`. */ + /** Returns a [[scala.sys.process.ProcessBuilder]] representing this `Source`. */ def cat = toSource private def toFile(f: File, append: Boolean) = #> (new FileOutput(f, append)) } diff --git a/src/library/scala/sys/process/ProcessBuilderImpl.scala b/src/library/scala/sys/process/ProcessBuilderImpl.scala index 236baaf038..0df2e648e0 100644 --- a/src/library/scala/sys/process/ProcessBuilderImpl.scala +++ b/src/library/scala/sys/process/ProcessBuilderImpl.scala @@ -53,12 +53,14 @@ private[process] trait ProcessBuilderImpl { override def run(io: ProcessIO): Process = { val success = new SyncVar[Boolean] - success put false - val t = Spawn({ - runImpl(io) - success set true - }, io.daemonizeThreads) - + def go(): Unit = { + var ok = false + try { + runImpl(io) + ok = true + } finally success.put(ok) + } + val t = Spawn(go(), io.daemonizeThreads) new ThreadProcess(t, success) } } diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index 2b7fcdeb73..8a0002b316 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -27,18 +27,18 @@ private[process] trait ProcessImpl { } } private[process] object Future { - def apply[T](f: => T): () => T = { + def apply[T](f: => T): (Thread, () => T) = { val result = new SyncVar[Either[Throwable, T]] def run(): Unit = - try result set Right(f) - catch { case e: Exception => result set Left(e) } + try result.put(Right(f)) + catch { case e: Exception => result.put(Left(e)) } - Spawn(run()) + val t = Spawn(run()) - () => result.get match { + (t, () => result.get match { case Right(value) => value case Left(exception) => throw exception - } + }) } } @@ -84,17 +84,22 @@ private[process] trait ProcessImpl { } private[process] abstract class CompoundProcess extends BasicProcess { + def isAlive() = processThread.isAlive() def destroy() = destroyer() - def exitValue() = getExitValue() getOrElse scala.sys.error("No exit code: process destroyed.") - def start() = getExitValue + def exitValue() = futureValue() getOrElse scala.sys.error("No exit code: process destroyed.") + def start() = { futureThread ;() } - protected lazy val (getExitValue, destroyer) = { + protected lazy val (processThread, (futureThread, futureValue), destroyer) = { val code = new SyncVar[Option[Int]]() - code set None - val thread = Spawn(code set runAndExitValue()) + val thread = Spawn { + var value: Option[Int] = None + try value = runAndExitValue() + finally code.put(value) + } ( - Future { thread.join(); code.get }, + thread, + Future(code.get), // thread.join() () => thread.interrupt() ) } @@ -109,45 +114,46 @@ private[process] trait ProcessImpl { } private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { - protected[this] override def runAndExitValue() = { - val currentSource = new SyncVar[Option[InputStream]] - val pipeOut = new PipedOutputStream - val source = new PipeSource(currentSource, pipeOut, a.toString) + protected[this] override def runAndExitValue() = runAndExitValue(new PipeSource(a.toString), new PipeSink(b.toString)) + protected[this] def runAndExitValue(source: PipeSource, sink: PipeSink): Option[Int] = { + source connectOut sink source.start() - - val pipeIn = new PipedInputStream(pipeOut) - val currentSink = new SyncVar[Option[OutputStream]] - val sink = new PipeSink(pipeIn, currentSink, b.toString) sink.start() - def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + /** Release PipeSource, PipeSink and Process in the correct order. + * If once connect Process with Source or Sink, then the order of releasing them + * must be Source -> Sink -> Process, otherwise IOException will be thrown. */ + def releaseResources(so: PipeSource, sk: PipeSink, p: Process *) = { + so.release() + sk.release() + p foreach( _.destroy() ) + } val firstIO = - if (toError) - defaultIO.withError(handleOutOrError) - else - defaultIO.withOutput(handleOutOrError) - val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) - - val second = b.run(secondIO) - val first = a.run(firstIO) - try { - runInterruptible { - val exit1 = first.exitValue() - currentSource put None - currentSink put None - val exit2 = second.exitValue() - // Since file redirection (e.g. #>) is implemented as a piped process, - // we ignore its exit value so cmd #> file doesn't always return 0. - if (b.hasExitValue) exit2 else exit1 - } { - first.destroy() - second.destroy() + if (toError) defaultIO.withError(source.connectIn) + else defaultIO.withOutput(source.connectIn) + val secondIO = defaultIO.withInput(sink.connectOut) + + val second = + try b.run(secondIO) + catch onError { err => + releaseResources(source, sink) + throw err } - } - finally { - BasicIO close pipeIn - BasicIO close pipeOut + val first = + try a.run(firstIO) + catch onError { err => + releaseResources(source, sink, second) + throw err + } + runInterruptible { + val exit1 = first.exitValue() + val exit2 = second.exitValue() + // Since file redirection (e.g. #>) is implemented as a piped process, + // we ignore its exit value so cmd #> file doesn't always return 0. + if (b.hasExitValue) exit2 else exit1 + } { + releaseResources(source, sink, first, second) } } } @@ -168,53 +174,66 @@ private[process] trait ProcessImpl { } } - private[process] class PipeSource( - currentSource: SyncVar[Option[InputStream]], - pipe: PipedOutputStream, - label: => String - ) extends PipeThread(false, () => label) { - - final override def run(): Unit = currentSource.get match { - case Some(source) => - try runloop(source, pipe) - finally currentSource.unset() - - run() - case None => - currentSource.unset() - BasicIO close pipe + private[process] class PipeSource(label: => String) extends PipeThread(false, () => label) { + protected[this] val pipe = new PipedOutputStream + protected[this] val source = new LinkedBlockingQueue[Option[InputStream]] + override def run(): Unit = { + try { + source.take match { + case Some(in) => runloop(in, pipe) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectIn(in: InputStream): Unit = source add Some(in) + def connectOut(sink: PipeSink): Unit = sink connectIn pipe + def release(): Unit = { + interrupt() + source add None + join() } } - private[process] class PipeSink( - pipe: PipedInputStream, - currentSink: SyncVar[Option[OutputStream]], - label: => String - ) extends PipeThread(true, () => label) { - - final override def run(): Unit = currentSink.get match { - case Some(sink) => - try runloop(pipe, sink) - finally currentSink.unset() - - run() - case None => - currentSink.unset() + private[process] class PipeSink(label: => String) extends PipeThread(true, () => label) { + protected[this] val pipe = new PipedInputStream + protected[this] val sink = new LinkedBlockingQueue[Option[OutputStream]] + override def run(): Unit = { + try { + sink.take match { + case Some(out) => runloop(pipe, out) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectOut(out: OutputStream): Unit = sink add Some(out) + def connectIn(pipeOut: PipedOutputStream): Unit = pipe connect pipeOut + def release(): Unit = { + interrupt() + sink add None + join() } } /** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. - * The implementation of `exitValue` waits until these threads die before returning. */ + * The implementation of `exitValue` waits until these threads die before returning. + */ private[process] class DummyProcess(action: => Int) extends Process { - private[this] val exitCode = Future(action) - override def exitValue() = exitCode() + private[this] val (thread, value) = Future(action) + override def isAlive() = thread.isAlive() + override def exitValue() = value() override def destroy() { } } + /** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the * output and error streams of the process. `inputThread` is the Thread created to write to the input stream of * the process. * The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before * returning. */ private[process] class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { + override def isAlive() = p.isAlive() override def exitValue() = { try p.waitFor() // wait for the process to terminate finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate @@ -231,10 +250,8 @@ private[process] trait ProcessImpl { } } private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { - override def exitValue() = { - thread.join() - if (success.get) 0 else 1 - } - override def destroy() { thread.interrupt() } + override def isAlive() = thread.isAlive() + override def exitValue() = if (success.get) 0 else 1 // thread.join() + override def destroy() = thread.interrupt() } } diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala index ac6ab8f670..440e62b6aa 100644 --- a/src/library/scala/sys/process/package.scala +++ b/src/library/scala/sys/process/package.scala @@ -185,8 +185,8 @@ package scala.sys { * new URL("http://www.scala-lang.org/") #> new File("scala-lang.html") ! * }}} * - * More information about the other ways of controlling I/O can be looked at - * in the scaladoc for the associated objects, traits and classes. + * More information about the other ways of controlling I/O can be found + * in the Scaladoc for the associated objects, traits and classes. * * ==Running the Process== * @@ -203,9 +203,9 @@ package scala.sys { package object process extends ProcessImplicits { /** The arguments passed to `java` when creating this process */ def javaVmArguments: List[String] = { - import scala.collection.JavaConversions._ + import scala.collection.JavaConverters._ - java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toList + java.lang.management.ManagementFactory.getRuntimeMXBean.getInputArguments.asScala.toList } /** The input stream of this process */ def stdin = java.lang.System.in @@ -225,16 +225,26 @@ package scala.sys { final val processDebug = props contains "scala.process.debug" dbg("Initializing process package.") - type =?>[-A, +B] = PartialFunction[A, B] - type Closeable = java.io.Closeable - type File = java.io.File - type IOException = java.io.IOException - type InputStream = java.io.InputStream - type JProcess = java.lang.Process - type JProcessBuilder = java.lang.ProcessBuilder - type OutputStream = java.io.OutputStream - type SyncVar[T] = scala.concurrent.SyncVar[T] - type URL = java.net.URL + type =?>[-A, +B] = PartialFunction[A, B] + type Closeable = java.io.Closeable + type File = java.io.File + type IOException = java.io.IOException + type InterruptedIOException = java.io.InterruptedIOException + type InputStream = java.io.InputStream + type JProcess = java.lang.Process + type JProcessBuilder = java.lang.ProcessBuilder + type LinkedBlockingQueue[T] = java.util.concurrent.LinkedBlockingQueue[T] + type OutputStream = java.io.OutputStream + type SyncVar[T] = scala.concurrent.SyncVar[T] + type URL = java.net.URL + + def onError[T](handler: Throwable => T): Throwable =?> T = { + case e @ _ => handler(e) + } + + def onIOInterrupt[T](handler: => T): Throwable =?> T = { + case _: InterruptedIOException => handler + } def onInterrupt[T](handler: => T): Throwable =?> T = { case _: InterruptedException => handler |