diff options
Diffstat (limited to 'src/library/scala/sys/process/ProcessImpl.scala')
-rw-r--r-- | src/library/scala/sys/process/ProcessImpl.scala | 134 |
1 files changed, 72 insertions, 62 deletions
diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index 2b7fcdeb73..d15f1a2b3d 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -109,45 +109,46 @@ private[process] trait ProcessImpl { } private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { - protected[this] override def runAndExitValue() = { - val currentSource = new SyncVar[Option[InputStream]] - val pipeOut = new PipedOutputStream - val source = new PipeSource(currentSource, pipeOut, a.toString) + protected[this] override def runAndExitValue() = runAndExitValue(new PipeSource(a.toString), new PipeSink(b.toString)) + protected[this] def runAndExitValue(source: PipeSource, sink: PipeSink): Option[Int] = { + source connectOut sink source.start() - - val pipeIn = new PipedInputStream(pipeOut) - val currentSink = new SyncVar[Option[OutputStream]] - val sink = new PipeSink(pipeIn, currentSink, b.toString) sink.start() - def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + /** Release PipeSource, PipeSink and Process in the correct order. + * If once connect Process with Source or Sink, then the order of releasing them + * must be Source -> Sink -> Process, otherwise IOException will be thrown. */ + def releaseResources(so: PipeSource, sk: PipeSink, p: Process *) = { + so.release() + sk.release() + p foreach( _.destroy() ) + } val firstIO = - if (toError) - defaultIO.withError(handleOutOrError) - else - defaultIO.withOutput(handleOutOrError) - val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) - - val second = b.run(secondIO) - val first = a.run(firstIO) - try { - runInterruptible { - val exit1 = first.exitValue() - currentSource put None - currentSink put None - val exit2 = second.exitValue() - // Since file redirection (e.g. #>) is implemented as a piped process, - // we ignore its exit value so cmd #> file doesn't always return 0. - if (b.hasExitValue) exit2 else exit1 - } { - first.destroy() - second.destroy() + if (toError) defaultIO.withError(source.connectIn) + else defaultIO.withOutput(source.connectIn) + val secondIO = defaultIO.withInput(sink.connectOut) + + val second = + try b.run(secondIO) + catch onError { err => + releaseResources(source, sink) + throw err } - } - finally { - BasicIO close pipeIn - BasicIO close pipeOut + val first = + try a.run(firstIO) + catch onError { err => + releaseResources(source, sink, second) + throw err + } + runInterruptible { + val exit1 = first.exitValue() + val exit2 = second.exitValue() + // Since file redirection (e.g. #>) is implemented as a piped process, + // we ignore its exit value so cmd #> file doesn't always return 0. + if (b.hasExitValue) exit2 else exit1 + } { + releaseResources(source, sink, first, second) } } } @@ -168,37 +169,46 @@ private[process] trait ProcessImpl { } } - private[process] class PipeSource( - currentSource: SyncVar[Option[InputStream]], - pipe: PipedOutputStream, - label: => String - ) extends PipeThread(false, () => label) { - - final override def run(): Unit = currentSource.get match { - case Some(source) => - try runloop(source, pipe) - finally currentSource.unset() - - run() - case None => - currentSource.unset() - BasicIO close pipe + private[process] class PipeSource(label: => String) extends PipeThread(false, () => label) { + protected[this] val pipe = new PipedOutputStream + protected[this] val source = new LinkedBlockingQueue[Option[InputStream]] + override def run(): Unit = { + try { + source.take match { + case Some(in) => runloop(in, pipe) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectIn(in: InputStream): Unit = source add Some(in) + def connectOut(sink: PipeSink): Unit = sink connectIn pipe + def release(): Unit = { + interrupt() + source add None + join() } } - private[process] class PipeSink( - pipe: PipedInputStream, - currentSink: SyncVar[Option[OutputStream]], - label: => String - ) extends PipeThread(true, () => label) { - - final override def run(): Unit = currentSink.get match { - case Some(sink) => - try runloop(pipe, sink) - finally currentSink.unset() - - run() - case None => - currentSink.unset() + private[process] class PipeSink(label: => String) extends PipeThread(true, () => label) { + protected[this] val pipe = new PipedInputStream + protected[this] val sink = new LinkedBlockingQueue[Option[OutputStream]] + override def run(): Unit = { + try { + sink.take match { + case Some(out) => runloop(pipe, out) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectOut(out: OutputStream): Unit = sink add Some(out) + def connectIn(pipeOut: PipedOutputStream): Unit = pipe connect pipeOut + def release(): Unit = { + interrupt() + sink add None + join() } } |