diff options
Diffstat (limited to 'src/library/scala/sys/process')
-rw-r--r-- | src/library/scala/sys/process/BasicIO.scala | 4 | ||||
-rw-r--r-- | src/library/scala/sys/process/Process.scala | 4 | ||||
-rw-r--r-- | src/library/scala/sys/process/ProcessBuilderImpl.scala | 2 | ||||
-rw-r--r-- | src/library/scala/sys/process/ProcessImpl.scala | 161 | ||||
-rw-r--r-- | src/library/scala/sys/process/package.scala | 34 |
5 files changed, 115 insertions, 90 deletions
diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala index 066b2f5373..b39ae77c62 100644 --- a/src/library/scala/sys/process/BasicIO.scala +++ b/src/library/scala/sys/process/BasicIO.scala @@ -33,7 +33,7 @@ object BasicIO { final val BufferSize = 8192 /** Used to separate lines in the `processFully` function that takes `Appendable`. */ - final val Newline = props("line.separator") + final val Newline = System.lineSeparator private[process] final class Streamed[T]( val process: T => Unit, @@ -221,7 +221,7 @@ object BasicIO { */ def transferFully(in: InputStream, out: OutputStream): Unit = try transferFullyImpl(in, out) - catch onInterrupt(()) + catch onIOInterrupt(()) private[this] def appendLine(buffer: Appendable): String => Unit = line => { buffer append line diff --git a/src/library/scala/sys/process/Process.scala b/src/library/scala/sys/process/Process.scala index 06b9967908..0ec749e78a 100644 --- a/src/library/scala/sys/process/Process.scala +++ b/src/library/scala/sys/process/Process.scala @@ -26,11 +26,11 @@ import scala.language.implicitConversions * make it possible for one to block until the process exits and get the exit value, * or destroy the process altogether. * - * Presently, one cannot poll the `Process` to see if it has finished. - * * @see [[scala.sys.process.ProcessBuilder]] */ trait Process { + /** Returns this process alive status */ + def isAlive(): Boolean /** Blocks until this process exits and returns the exit code.*/ def exitValue(): Int /** Destroys this process. */ diff --git a/src/library/scala/sys/process/ProcessBuilderImpl.scala b/src/library/scala/sys/process/ProcessBuilderImpl.scala index 236baaf038..eef140c16a 100644 --- a/src/library/scala/sys/process/ProcessBuilderImpl.scala +++ b/src/library/scala/sys/process/ProcessBuilderImpl.scala @@ -56,7 +56,7 @@ private[process] trait ProcessBuilderImpl { success put false val t = Spawn({ runImpl(io) - success set true + success.put(true) }, io.daemonizeThreads) new ThreadProcess(t, success) diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index 2b7fcdeb73..6da0dee056 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -27,18 +27,18 @@ private[process] trait ProcessImpl { } } private[process] object Future { - def apply[T](f: => T): () => T = { + def apply[T](f: => T): (Thread, () => T) = { val result = new SyncVar[Either[Throwable, T]] def run(): Unit = - try result set Right(f) - catch { case e: Exception => result set Left(e) } + try result.put(Right(f)) + catch { case e: Exception => result.put(Left(e)) } - Spawn(run()) + val t = Spawn(run()) - () => result.get match { + (t, () => result.get match { case Right(value) => value case Left(exception) => throw exception - } + }) } } @@ -84,16 +84,18 @@ private[process] trait ProcessImpl { } private[process] abstract class CompoundProcess extends BasicProcess { + def isAlive() = processThread.isAlive() def destroy() = destroyer() - def exitValue() = getExitValue() getOrElse scala.sys.error("No exit code: process destroyed.") + def exitValue() = getExitValue._2() getOrElse scala.sys.error("No exit code: process destroyed.") def start() = getExitValue - protected lazy val (getExitValue, destroyer) = { + protected lazy val (processThread, getExitValue, destroyer) = { val code = new SyncVar[Option[Int]]() - code set None - val thread = Spawn(code set runAndExitValue()) + code.put(None) + val thread = Spawn(code.put(runAndExitValue())) ( + thread, Future { thread.join(); code.get }, () => thread.interrupt() ) @@ -109,45 +111,46 @@ private[process] trait ProcessImpl { } private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { - protected[this] override def runAndExitValue() = { - val currentSource = new SyncVar[Option[InputStream]] - val pipeOut = new PipedOutputStream - val source = new PipeSource(currentSource, pipeOut, a.toString) + protected[this] override def runAndExitValue() = runAndExitValue(new PipeSource(a.toString), new PipeSink(b.toString)) + protected[this] def runAndExitValue(source: PipeSource, sink: PipeSink): Option[Int] = { + source connectOut sink source.start() - - val pipeIn = new PipedInputStream(pipeOut) - val currentSink = new SyncVar[Option[OutputStream]] - val sink = new PipeSink(pipeIn, currentSink, b.toString) sink.start() - def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + /** Release PipeSource, PipeSink and Process in the correct order. + * If once connect Process with Source or Sink, then the order of releasing them + * must be Source -> Sink -> Process, otherwise IOException will be thrown. */ + def releaseResources(so: PipeSource, sk: PipeSink, p: Process *) = { + so.release() + sk.release() + p foreach( _.destroy() ) + } val firstIO = - if (toError) - defaultIO.withError(handleOutOrError) - else - defaultIO.withOutput(handleOutOrError) - val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) - - val second = b.run(secondIO) - val first = a.run(firstIO) - try { - runInterruptible { - val exit1 = first.exitValue() - currentSource put None - currentSink put None - val exit2 = second.exitValue() - // Since file redirection (e.g. #>) is implemented as a piped process, - // we ignore its exit value so cmd #> file doesn't always return 0. - if (b.hasExitValue) exit2 else exit1 - } { - first.destroy() - second.destroy() + if (toError) defaultIO.withError(source.connectIn) + else defaultIO.withOutput(source.connectIn) + val secondIO = defaultIO.withInput(sink.connectOut) + + val second = + try b.run(secondIO) + catch onError { err => + releaseResources(source, sink) + throw err } - } - finally { - BasicIO close pipeIn - BasicIO close pipeOut + val first = + try a.run(firstIO) + catch onError { err => + releaseResources(source, sink, second) + throw err + } + runInterruptible { + val exit1 = first.exitValue() + val exit2 = second.exitValue() + // Since file redirection (e.g. #>) is implemented as a piped process, + // we ignore its exit value so cmd #> file doesn't always return 0. + if (b.hasExitValue) exit2 else exit1 + } { + releaseResources(source, sink, first, second) } } } @@ -168,37 +171,46 @@ private[process] trait ProcessImpl { } } - private[process] class PipeSource( - currentSource: SyncVar[Option[InputStream]], - pipe: PipedOutputStream, - label: => String - ) extends PipeThread(false, () => label) { - - final override def run(): Unit = currentSource.get match { - case Some(source) => - try runloop(source, pipe) - finally currentSource.unset() - - run() - case None => - currentSource.unset() - BasicIO close pipe + private[process] class PipeSource(label: => String) extends PipeThread(false, () => label) { + protected[this] val pipe = new PipedOutputStream + protected[this] val source = new LinkedBlockingQueue[Option[InputStream]] + override def run(): Unit = { + try { + source.take match { + case Some(in) => runloop(in, pipe) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectIn(in: InputStream): Unit = source add Some(in) + def connectOut(sink: PipeSink): Unit = sink connectIn pipe + def release(): Unit = { + interrupt() + source add None + join() } } - private[process] class PipeSink( - pipe: PipedInputStream, - currentSink: SyncVar[Option[OutputStream]], - label: => String - ) extends PipeThread(true, () => label) { - - final override def run(): Unit = currentSink.get match { - case Some(sink) => - try runloop(pipe, sink) - finally currentSink.unset() - - run() - case None => - currentSink.unset() + private[process] class PipeSink(label: => String) extends PipeThread(true, () => label) { + protected[this] val pipe = new PipedInputStream + protected[this] val sink = new LinkedBlockingQueue[Option[OutputStream]] + override def run(): Unit = { + try { + sink.take match { + case Some(out) => runloop(pipe, out) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectOut(out: OutputStream): Unit = sink add Some(out) + def connectIn(pipeOut: PipedOutputStream): Unit = pipe connect pipeOut + def release(): Unit = { + interrupt() + sink add None + join() } } @@ -206,7 +218,8 @@ private[process] trait ProcessImpl { * The implementation of `exitValue` waits until these threads die before returning. */ private[process] class DummyProcess(action: => Int) extends Process { private[this] val exitCode = Future(action) - override def exitValue() = exitCode() + override def isAlive() = exitCode._1.isAlive() + override def exitValue() = exitCode._2() override def destroy() { } } /** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the @@ -215,6 +228,7 @@ private[process] trait ProcessImpl { * The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before * returning. */ private[process] class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process { + override def isAlive() = p.isAlive() override def exitValue() = { try p.waitFor() // wait for the process to terminate finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate @@ -231,6 +245,7 @@ private[process] trait ProcessImpl { } } private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { + override def isAlive() = thread.isAlive() override def exitValue() = { thread.join() if (success.get) 0 else 1 diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala index 445c3aee60..ff0fd920c9 100644 --- a/src/library/scala/sys/process/package.scala +++ b/src/library/scala/sys/process/package.scala @@ -203,9 +203,9 @@ package scala.sys { package object process extends ProcessImplicits { /** The arguments passed to `java` when creating this process */ def javaVmArguments: List[String] = { - import scala.collection.JavaConversions._ + import scala.collection.JavaConverters._ - java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toList + java.lang.management.ManagementFactory.getRuntimeMXBean.getInputArguments.asScala.toList } /** The input stream of this process */ def stdin = java.lang.System.in @@ -225,16 +225,26 @@ package scala.sys { final val processDebug = props contains "scala.process.debug" dbg("Initializing process package.") - type =?>[-A, +B] = PartialFunction[A, B] - type Closeable = java.io.Closeable - type File = java.io.File - type IOException = java.io.IOException - type InputStream = java.io.InputStream - type JProcess = java.lang.Process - type JProcessBuilder = java.lang.ProcessBuilder - type OutputStream = java.io.OutputStream - type SyncVar[T] = scala.concurrent.SyncVar[T] - type URL = java.net.URL + type =?>[-A, +B] = PartialFunction[A, B] + type Closeable = java.io.Closeable + type File = java.io.File + type IOException = java.io.IOException + type InterruptedIOException = java.io.InterruptedIOException + type InputStream = java.io.InputStream + type JProcess = java.lang.Process + type JProcessBuilder = java.lang.ProcessBuilder + type LinkedBlockingQueue[T] = java.util.concurrent.LinkedBlockingQueue[T] + type OutputStream = java.io.OutputStream + type SyncVar[T] = scala.concurrent.SyncVar[T] + type URL = java.net.URL + + def onError[T](handler: Throwable => T): Throwable =?> T = { + case e @ _ => handler(e) + } + + def onIOInterrupt[T](handler: => T): Throwable =?> T = { + case _: InterruptedIOException => handler + } def onInterrupt[T](handler: => T): Throwable =?> T = { case _: InterruptedException => handler |