summaryrefslogtreecommitdiff
path: root/src/library/scala/sys/process
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/sys/process')
-rw-r--r--src/library/scala/sys/process/BasicIO.scala4
-rw-r--r--src/library/scala/sys/process/Process.scala4
-rw-r--r--src/library/scala/sys/process/ProcessBuilderImpl.scala2
-rw-r--r--src/library/scala/sys/process/ProcessImpl.scala161
-rw-r--r--src/library/scala/sys/process/package.scala34
5 files changed, 115 insertions, 90 deletions
diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala
index 066b2f5373..b39ae77c62 100644
--- a/src/library/scala/sys/process/BasicIO.scala
+++ b/src/library/scala/sys/process/BasicIO.scala
@@ -33,7 +33,7 @@ object BasicIO {
final val BufferSize = 8192
/** Used to separate lines in the `processFully` function that takes `Appendable`. */
- final val Newline = props("line.separator")
+ final val Newline = System.lineSeparator
private[process] final class Streamed[T](
val process: T => Unit,
@@ -221,7 +221,7 @@ object BasicIO {
*/
def transferFully(in: InputStream, out: OutputStream): Unit =
try transferFullyImpl(in, out)
- catch onInterrupt(())
+ catch onIOInterrupt(())
private[this] def appendLine(buffer: Appendable): String => Unit = line => {
buffer append line
diff --git a/src/library/scala/sys/process/Process.scala b/src/library/scala/sys/process/Process.scala
index 06b9967908..0ec749e78a 100644
--- a/src/library/scala/sys/process/Process.scala
+++ b/src/library/scala/sys/process/Process.scala
@@ -26,11 +26,11 @@ import scala.language.implicitConversions
* make it possible for one to block until the process exits and get the exit value,
* or destroy the process altogether.
*
- * Presently, one cannot poll the `Process` to see if it has finished.
- *
* @see [[scala.sys.process.ProcessBuilder]]
*/
trait Process {
+ /** Returns this process alive status */
+ def isAlive(): Boolean
/** Blocks until this process exits and returns the exit code.*/
def exitValue(): Int
/** Destroys this process. */
diff --git a/src/library/scala/sys/process/ProcessBuilderImpl.scala b/src/library/scala/sys/process/ProcessBuilderImpl.scala
index 236baaf038..eef140c16a 100644
--- a/src/library/scala/sys/process/ProcessBuilderImpl.scala
+++ b/src/library/scala/sys/process/ProcessBuilderImpl.scala
@@ -56,7 +56,7 @@ private[process] trait ProcessBuilderImpl {
success put false
val t = Spawn({
runImpl(io)
- success set true
+ success.put(true)
}, io.daemonizeThreads)
new ThreadProcess(t, success)
diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala
index 2b7fcdeb73..6da0dee056 100644
--- a/src/library/scala/sys/process/ProcessImpl.scala
+++ b/src/library/scala/sys/process/ProcessImpl.scala
@@ -27,18 +27,18 @@ private[process] trait ProcessImpl {
}
}
private[process] object Future {
- def apply[T](f: => T): () => T = {
+ def apply[T](f: => T): (Thread, () => T) = {
val result = new SyncVar[Either[Throwable, T]]
def run(): Unit =
- try result set Right(f)
- catch { case e: Exception => result set Left(e) }
+ try result.put(Right(f))
+ catch { case e: Exception => result.put(Left(e)) }
- Spawn(run())
+ val t = Spawn(run())
- () => result.get match {
+ (t, () => result.get match {
case Right(value) => value
case Left(exception) => throw exception
- }
+ })
}
}
@@ -84,16 +84,18 @@ private[process] trait ProcessImpl {
}
private[process] abstract class CompoundProcess extends BasicProcess {
+ def isAlive() = processThread.isAlive()
def destroy() = destroyer()
- def exitValue() = getExitValue() getOrElse scala.sys.error("No exit code: process destroyed.")
+ def exitValue() = getExitValue._2() getOrElse scala.sys.error("No exit code: process destroyed.")
def start() = getExitValue
- protected lazy val (getExitValue, destroyer) = {
+ protected lazy val (processThread, getExitValue, destroyer) = {
val code = new SyncVar[Option[Int]]()
- code set None
- val thread = Spawn(code set runAndExitValue())
+ code.put(None)
+ val thread = Spawn(code.put(runAndExitValue()))
(
+ thread,
Future { thread.join(); code.get },
() => thread.interrupt()
)
@@ -109,45 +111,46 @@ private[process] trait ProcessImpl {
}
private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess {
- protected[this] override def runAndExitValue() = {
- val currentSource = new SyncVar[Option[InputStream]]
- val pipeOut = new PipedOutputStream
- val source = new PipeSource(currentSource, pipeOut, a.toString)
+ protected[this] override def runAndExitValue() = runAndExitValue(new PipeSource(a.toString), new PipeSink(b.toString))
+ protected[this] def runAndExitValue(source: PipeSource, sink: PipeSink): Option[Int] = {
+ source connectOut sink
source.start()
-
- val pipeIn = new PipedInputStream(pipeOut)
- val currentSink = new SyncVar[Option[OutputStream]]
- val sink = new PipeSink(pipeIn, currentSink, b.toString)
sink.start()
- def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput)
+ /** Release PipeSource, PipeSink and Process in the correct order.
+ * If once connect Process with Source or Sink, then the order of releasing them
+ * must be Source -> Sink -> Process, otherwise IOException will be thrown. */
+ def releaseResources(so: PipeSource, sk: PipeSink, p: Process *) = {
+ so.release()
+ sk.release()
+ p foreach( _.destroy() )
+ }
val firstIO =
- if (toError)
- defaultIO.withError(handleOutOrError)
- else
- defaultIO.withOutput(handleOutOrError)
- val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput))
-
- val second = b.run(secondIO)
- val first = a.run(firstIO)
- try {
- runInterruptible {
- val exit1 = first.exitValue()
- currentSource put None
- currentSink put None
- val exit2 = second.exitValue()
- // Since file redirection (e.g. #>) is implemented as a piped process,
- // we ignore its exit value so cmd #> file doesn't always return 0.
- if (b.hasExitValue) exit2 else exit1
- } {
- first.destroy()
- second.destroy()
+ if (toError) defaultIO.withError(source.connectIn)
+ else defaultIO.withOutput(source.connectIn)
+ val secondIO = defaultIO.withInput(sink.connectOut)
+
+ val second =
+ try b.run(secondIO)
+ catch onError { err =>
+ releaseResources(source, sink)
+ throw err
}
- }
- finally {
- BasicIO close pipeIn
- BasicIO close pipeOut
+ val first =
+ try a.run(firstIO)
+ catch onError { err =>
+ releaseResources(source, sink, second)
+ throw err
+ }
+ runInterruptible {
+ val exit1 = first.exitValue()
+ val exit2 = second.exitValue()
+ // Since file redirection (e.g. #>) is implemented as a piped process,
+ // we ignore its exit value so cmd #> file doesn't always return 0.
+ if (b.hasExitValue) exit2 else exit1
+ } {
+ releaseResources(source, sink, first, second)
}
}
}
@@ -168,37 +171,46 @@ private[process] trait ProcessImpl {
}
}
- private[process] class PipeSource(
- currentSource: SyncVar[Option[InputStream]],
- pipe: PipedOutputStream,
- label: => String
- ) extends PipeThread(false, () => label) {
-
- final override def run(): Unit = currentSource.get match {
- case Some(source) =>
- try runloop(source, pipe)
- finally currentSource.unset()
-
- run()
- case None =>
- currentSource.unset()
- BasicIO close pipe
+ private[process] class PipeSource(label: => String) extends PipeThread(false, () => label) {
+ protected[this] val pipe = new PipedOutputStream
+ protected[this] val source = new LinkedBlockingQueue[Option[InputStream]]
+ override def run(): Unit = {
+ try {
+ source.take match {
+ case Some(in) => runloop(in, pipe)
+ case None =>
+ }
+ }
+ catch onInterrupt(())
+ finally BasicIO close pipe
+ }
+ def connectIn(in: InputStream): Unit = source add Some(in)
+ def connectOut(sink: PipeSink): Unit = sink connectIn pipe
+ def release(): Unit = {
+ interrupt()
+ source add None
+ join()
}
}
- private[process] class PipeSink(
- pipe: PipedInputStream,
- currentSink: SyncVar[Option[OutputStream]],
- label: => String
- ) extends PipeThread(true, () => label) {
-
- final override def run(): Unit = currentSink.get match {
- case Some(sink) =>
- try runloop(pipe, sink)
- finally currentSink.unset()
-
- run()
- case None =>
- currentSink.unset()
+ private[process] class PipeSink(label: => String) extends PipeThread(true, () => label) {
+ protected[this] val pipe = new PipedInputStream
+ protected[this] val sink = new LinkedBlockingQueue[Option[OutputStream]]
+ override def run(): Unit = {
+ try {
+ sink.take match {
+ case Some(out) => runloop(pipe, out)
+ case None =>
+ }
+ }
+ catch onInterrupt(())
+ finally BasicIO close pipe
+ }
+ def connectOut(out: OutputStream): Unit = sink add Some(out)
+ def connectIn(pipeOut: PipedOutputStream): Unit = pipe connect pipeOut
+ def release(): Unit = {
+ interrupt()
+ sink add None
+ join()
}
}
@@ -206,7 +218,8 @@ private[process] trait ProcessImpl {
* The implementation of `exitValue` waits until these threads die before returning. */
private[process] class DummyProcess(action: => Int) extends Process {
private[this] val exitCode = Future(action)
- override def exitValue() = exitCode()
+ override def isAlive() = exitCode._1.isAlive()
+ override def exitValue() = exitCode._2()
override def destroy() { }
}
/** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the
@@ -215,6 +228,7 @@ private[process] trait ProcessImpl {
* The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before
* returning. */
private[process] class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process {
+ override def isAlive() = p.isAlive()
override def exitValue() = {
try p.waitFor() // wait for the process to terminate
finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate
@@ -231,6 +245,7 @@ private[process] trait ProcessImpl {
}
}
private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process {
+ override def isAlive() = thread.isAlive()
override def exitValue() = {
thread.join()
if (success.get) 0 else 1
diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala
index 445c3aee60..ff0fd920c9 100644
--- a/src/library/scala/sys/process/package.scala
+++ b/src/library/scala/sys/process/package.scala
@@ -203,9 +203,9 @@ package scala.sys {
package object process extends ProcessImplicits {
/** The arguments passed to `java` when creating this process */
def javaVmArguments: List[String] = {
- import scala.collection.JavaConversions._
+ import scala.collection.JavaConverters._
- java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toList
+ java.lang.management.ManagementFactory.getRuntimeMXBean.getInputArguments.asScala.toList
}
/** The input stream of this process */
def stdin = java.lang.System.in
@@ -225,16 +225,26 @@ package scala.sys {
final val processDebug = props contains "scala.process.debug"
dbg("Initializing process package.")
- type =?>[-A, +B] = PartialFunction[A, B]
- type Closeable = java.io.Closeable
- type File = java.io.File
- type IOException = java.io.IOException
- type InputStream = java.io.InputStream
- type JProcess = java.lang.Process
- type JProcessBuilder = java.lang.ProcessBuilder
- type OutputStream = java.io.OutputStream
- type SyncVar[T] = scala.concurrent.SyncVar[T]
- type URL = java.net.URL
+ type =?>[-A, +B] = PartialFunction[A, B]
+ type Closeable = java.io.Closeable
+ type File = java.io.File
+ type IOException = java.io.IOException
+ type InterruptedIOException = java.io.InterruptedIOException
+ type InputStream = java.io.InputStream
+ type JProcess = java.lang.Process
+ type JProcessBuilder = java.lang.ProcessBuilder
+ type LinkedBlockingQueue[T] = java.util.concurrent.LinkedBlockingQueue[T]
+ type OutputStream = java.io.OutputStream
+ type SyncVar[T] = scala.concurrent.SyncVar[T]
+ type URL = java.net.URL
+
+ def onError[T](handler: Throwable => T): Throwable =?> T = {
+ case e @ _ => handler(e)
+ }
+
+ def onIOInterrupt[T](handler: => T): Throwable =?> T = {
+ case _: InterruptedIOException => handler
+ }
def onInterrupt[T](handler: => T): Throwable =?> T = {
case _: InterruptedException => handler