summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/sys/process/BasicIO.scala2
-rw-r--r--src/library/scala/sys/process/ProcessImpl.scala134
-rw-r--r--src/library/scala/sys/process/package.scala30
-rw-r--r--test/files/run/t7350.check43
-rw-r--r--test/files/run/t7350.scala302
5 files changed, 438 insertions, 73 deletions
diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala
index b31bbf0540..866dac4458 100644
--- a/src/library/scala/sys/process/BasicIO.scala
+++ b/src/library/scala/sys/process/BasicIO.scala
@@ -221,7 +221,7 @@ object BasicIO {
*/
def transferFully(in: InputStream, out: OutputStream): Unit =
try transferFullyImpl(in, out)
- catch onInterrupt(())
+ catch onIOInterrupt(())
private[this] def appendLine(buffer: Appendable): String => Unit = line => {
buffer append line
diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala
index 2b7fcdeb73..d15f1a2b3d 100644
--- a/src/library/scala/sys/process/ProcessImpl.scala
+++ b/src/library/scala/sys/process/ProcessImpl.scala
@@ -109,45 +109,46 @@ private[process] trait ProcessImpl {
}
private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess {
- protected[this] override def runAndExitValue() = {
- val currentSource = new SyncVar[Option[InputStream]]
- val pipeOut = new PipedOutputStream
- val source = new PipeSource(currentSource, pipeOut, a.toString)
+ protected[this] override def runAndExitValue() = runAndExitValue(new PipeSource(a.toString), new PipeSink(b.toString))
+ protected[this] def runAndExitValue(source: PipeSource, sink: PipeSink): Option[Int] = {
+ source connectOut sink
source.start()
-
- val pipeIn = new PipedInputStream(pipeOut)
- val currentSink = new SyncVar[Option[OutputStream]]
- val sink = new PipeSink(pipeIn, currentSink, b.toString)
sink.start()
- def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput)
+ /** Release PipeSource, PipeSink and Process in the correct order.
+ * If once connect Process with Source or Sink, then the order of releasing them
+ * must be Source -> Sink -> Process, otherwise IOException will be thrown. */
+ def releaseResources(so: PipeSource, sk: PipeSink, p: Process *) = {
+ so.release()
+ sk.release()
+ p foreach( _.destroy() )
+ }
val firstIO =
- if (toError)
- defaultIO.withError(handleOutOrError)
- else
- defaultIO.withOutput(handleOutOrError)
- val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput))
-
- val second = b.run(secondIO)
- val first = a.run(firstIO)
- try {
- runInterruptible {
- val exit1 = first.exitValue()
- currentSource put None
- currentSink put None
- val exit2 = second.exitValue()
- // Since file redirection (e.g. #>) is implemented as a piped process,
- // we ignore its exit value so cmd #> file doesn't always return 0.
- if (b.hasExitValue) exit2 else exit1
- } {
- first.destroy()
- second.destroy()
+ if (toError) defaultIO.withError(source.connectIn)
+ else defaultIO.withOutput(source.connectIn)
+ val secondIO = defaultIO.withInput(sink.connectOut)
+
+ val second =
+ try b.run(secondIO)
+ catch onError { err =>
+ releaseResources(source, sink)
+ throw err
}
- }
- finally {
- BasicIO close pipeIn
- BasicIO close pipeOut
+ val first =
+ try a.run(firstIO)
+ catch onError { err =>
+ releaseResources(source, sink, second)
+ throw err
+ }
+ runInterruptible {
+ val exit1 = first.exitValue()
+ val exit2 = second.exitValue()
+ // Since file redirection (e.g. #>) is implemented as a piped process,
+ // we ignore its exit value so cmd #> file doesn't always return 0.
+ if (b.hasExitValue) exit2 else exit1
+ } {
+ releaseResources(source, sink, first, second)
}
}
}
@@ -168,37 +169,46 @@ private[process] trait ProcessImpl {
}
}
- private[process] class PipeSource(
- currentSource: SyncVar[Option[InputStream]],
- pipe: PipedOutputStream,
- label: => String
- ) extends PipeThread(false, () => label) {
-
- final override def run(): Unit = currentSource.get match {
- case Some(source) =>
- try runloop(source, pipe)
- finally currentSource.unset()
-
- run()
- case None =>
- currentSource.unset()
- BasicIO close pipe
+ private[process] class PipeSource(label: => String) extends PipeThread(false, () => label) {
+ protected[this] val pipe = new PipedOutputStream
+ protected[this] val source = new LinkedBlockingQueue[Option[InputStream]]
+ override def run(): Unit = {
+ try {
+ source.take match {
+ case Some(in) => runloop(in, pipe)
+ case None =>
+ }
+ }
+ catch onInterrupt(())
+ finally BasicIO close pipe
+ }
+ def connectIn(in: InputStream): Unit = source add Some(in)
+ def connectOut(sink: PipeSink): Unit = sink connectIn pipe
+ def release(): Unit = {
+ interrupt()
+ source add None
+ join()
}
}
- private[process] class PipeSink(
- pipe: PipedInputStream,
- currentSink: SyncVar[Option[OutputStream]],
- label: => String
- ) extends PipeThread(true, () => label) {
-
- final override def run(): Unit = currentSink.get match {
- case Some(sink) =>
- try runloop(pipe, sink)
- finally currentSink.unset()
-
- run()
- case None =>
- currentSink.unset()
+ private[process] class PipeSink(label: => String) extends PipeThread(true, () => label) {
+ protected[this] val pipe = new PipedInputStream
+ protected[this] val sink = new LinkedBlockingQueue[Option[OutputStream]]
+ override def run(): Unit = {
+ try {
+ sink.take match {
+ case Some(out) => runloop(pipe, out)
+ case None =>
+ }
+ }
+ catch onInterrupt(())
+ finally BasicIO close pipe
+ }
+ def connectOut(out: OutputStream): Unit = sink add Some(out)
+ def connectIn(pipeOut: PipedOutputStream): Unit = pipe connect pipeOut
+ def release(): Unit = {
+ interrupt()
+ sink add None
+ join()
}
}
diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala
index 1340a6c415..91fa99e3df 100644
--- a/src/library/scala/sys/process/package.scala
+++ b/src/library/scala/sys/process/package.scala
@@ -224,16 +224,26 @@ package scala.sys {
final val processDebug = props contains "scala.process.debug"
dbg("Initializing process package.")
- type =?>[-A, +B] = PartialFunction[A, B]
- type Closeable = java.io.Closeable
- type File = java.io.File
- type IOException = java.io.IOException
- type InputStream = java.io.InputStream
- type JProcess = java.lang.Process
- type JProcessBuilder = java.lang.ProcessBuilder
- type OutputStream = java.io.OutputStream
- type SyncVar[T] = scala.concurrent.SyncVar[T]
- type URL = java.net.URL
+ type =?>[-A, +B] = PartialFunction[A, B]
+ type Closeable = java.io.Closeable
+ type File = java.io.File
+ type IOException = java.io.IOException
+ type InterruptedIOException = java.io.InterruptedIOException
+ type InputStream = java.io.InputStream
+ type JProcess = java.lang.Process
+ type JProcessBuilder = java.lang.ProcessBuilder
+ type LinkedBlockingQueue[T] = java.util.concurrent.LinkedBlockingQueue[T]
+ type OutputStream = java.io.OutputStream
+ type SyncVar[T] = scala.concurrent.SyncVar[T]
+ type URL = java.net.URL
+
+ def onError[T](handler: Throwable => T): Throwable =?> T = {
+ case e @ _ => handler(e)
+ }
+
+ def onIOInterrupt[T](handler: => T): Throwable =?> T = {
+ case _: InterruptedIOException => handler
+ }
def onInterrupt[T](handler: => T): Throwable =?> T = {
case _: InterruptedException => handler
diff --git a/test/files/run/t7350.check b/test/files/run/t7350.check
new file mode 100644
index 0000000000..5959e04400
--- /dev/null
+++ b/test/files/run/t7350.check
@@ -0,0 +1,43 @@
+PipedProcesses need not to release resources when it normally ends
+- source.releaseCount: 0
+- sink.releaseCount: 0
+- a.destroyCount: 0
+- b.destroyCount: 0
+PipedProcesses must release resources when b.run() fails
+- source.releaseCount: 1
+- sink.releaseCount: 1
+- a.destroyCount: 0
+- b.destroyCount: 0
+PipedProcesses must release resources when a.run() fails
+- source.releaseCount: 1
+- sink.releaseCount: 1
+- a.destroyCount: 0
+- b.destroyCount: 1
+PipedProcesses must release resources when interrupted waiting for first.exitValue()
+- source.releaseCount: 1
+- sink.releaseCount: 1
+- a.destroyCount: 1
+- b.destroyCount: 1
+PipedProcesses must release resources when interrupted waiting for second.exitValue()
+- source.releaseCount: 1
+- sink.releaseCount: 1
+- a.destroyCount: 1
+- b.destroyCount: 1
+PipeSource and PipeSink must release resources when it normally ends
+- in.closed: true
+- out.closed: true
+- source.isReleased: true
+- sink.isReleased: true
+PipeSource and PipeSink must release resources when when waiting for source.take()
+- out.closed: true
+- source.isReleased: true
+- sink.isReleased: true
+PipeSource and PipeSink must release resources when when waiting for sink.take()
+- in.closed: true
+- source.isReleased: true
+- sink.isReleased: true
+PipeSource and PipeSink must release resources when copying stream
+- in.closed: true
+- out.closed: true
+- source.isReleased: true
+- sink.isReleased: true
diff --git a/test/files/run/t7350.scala b/test/files/run/t7350.scala
new file mode 100644
index 0000000000..156d2f0719
--- /dev/null
+++ b/test/files/run/t7350.scala
@@ -0,0 +1,302 @@
+
+import scala.sys.process.debug._
+import scala.sys.process.{ProcessLogger, BasicIO}
+import java.io.{Closeable, ByteArrayInputStream, ByteArrayOutputStream, InputStream, IOException}
+import scala.concurrent.{Future, Await}
+import scala.concurrent.duration.{Duration, SECONDS}
+import scala.util.control.Exception._
+import scala.concurrent.ExecutionContext.Implicits.global
+
+object Test extends App {
+ // Each test normally ends in a moment, but for a worst case,
+ // waits until 5 seconds and breaks off.
+
+ // Tests for PipedProcesses
+
+ println("PipedProcesses need not to release resources when it normally ends")
+
+ {
+ val io = BasicIO(false, ProcessLogger(_ => ()))
+ val source = new PipeSourceMock
+ val sink = new PipeSinkMock
+ val a = new ProcessMock(error = false)
+ val b = new ProcessMock(error = false)
+ val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false)
+ val f = Future {
+ p.callRunAndExitValue(source, sink)
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- source.releaseCount: ${source.releaseCount}")
+ println(s"- sink.releaseCount: ${sink.releaseCount}")
+ println(s"- a.destroyCount: ${a.destroyCount}")
+ println(s"- b.destroyCount: ${b.destroyCount}")
+ }
+
+ println("PipedProcesses must release resources when b.run() fails")
+
+ {
+ val io = BasicIO(false, ProcessLogger(_ => ()))
+ val source = new PipeSourceMock
+ val sink = new PipeSinkMock
+ val a = new ProcessMock(error = false)
+ val b = new ProcessMock(error = false)
+ val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = true), io, false)
+ val f = Future {
+ ignoring(classOf[IOException]) {
+ p.callRunAndExitValue(source, sink)
+ }
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- source.releaseCount: ${source.releaseCount}")
+ println(s"- sink.releaseCount: ${sink.releaseCount}")
+ println(s"- a.destroyCount: ${a.destroyCount}")
+ println(s"- b.destroyCount: ${b.destroyCount}")
+ }
+
+ println("PipedProcesses must release resources when a.run() fails")
+
+ {
+ val io = BasicIO(false, ProcessLogger(_ => ()))
+ val source = new PipeSourceMock
+ val sink = new PipeSinkMock
+ val a = new ProcessMock(error = false)
+ val b = new ProcessMock(error = false)
+ val p = new PipedProcesses(new ProcessBuilderMock(a, error = true), new ProcessBuilderMock(b, error = false), io, false)
+ val f = Future {
+ ignoring(classOf[IOException]) {
+ p.callRunAndExitValue(source, sink)
+ }
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- source.releaseCount: ${source.releaseCount}")
+ println(s"- sink.releaseCount: ${sink.releaseCount}")
+ println(s"- a.destroyCount: ${a.destroyCount}")
+ println(s"- b.destroyCount: ${b.destroyCount}")
+ }
+
+ println("PipedProcesses must release resources when interrupted waiting for first.exitValue()")
+
+ {
+ val io = BasicIO(false, ProcessLogger(_ => ()))
+ val source = new PipeSourceMock
+ val sink = new PipeSinkMock
+ val a = new ProcessMock(error = true)
+ val b = new ProcessMock(error = false)
+ val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false)
+ val f = Future {
+ p.callRunAndExitValue(source, sink)
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- source.releaseCount: ${source.releaseCount}")
+ println(s"- sink.releaseCount: ${sink.releaseCount}")
+ println(s"- a.destroyCount: ${a.destroyCount}")
+ println(s"- b.destroyCount: ${b.destroyCount}")
+ }
+
+ println("PipedProcesses must release resources when interrupted waiting for second.exitValue()")
+
+ {
+ val io = BasicIO(false, ProcessLogger(_ => ()))
+ val source = new PipeSourceMock
+ val sink = new PipeSinkMock
+ val a = new ProcessMock(error = false)
+ val b = new ProcessMock(error = true)
+ val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false)
+ val f = Future {
+ p.callRunAndExitValue(source, sink)
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- source.releaseCount: ${source.releaseCount}")
+ println(s"- sink.releaseCount: ${sink.releaseCount}")
+ println(s"- a.destroyCount: ${a.destroyCount}")
+ println(s"- b.destroyCount: ${b.destroyCount}")
+ }
+
+ // Tests for PipeSource and PipeSink
+
+ trait CloseChecking extends Closeable {
+ var closed = false
+ override def close() = closed = true
+ }
+ class DebugOutputStream extends ByteArrayOutputStream with CloseChecking
+ class DebugInputStream(s: String) extends ByteArrayInputStream(s.getBytes()) with CloseChecking
+ class DebugInfinityInputStream extends InputStream with CloseChecking {
+ def read() = 1
+ }
+
+ def sourceSink() = {
+ val source = new PipeSource
+ val sink = new PipeSink
+ source connectOut sink
+ source.start()
+ sink.start()
+ (source, sink)
+ }
+
+ println("PipeSource and PipeSink must release resources when it normally ends")
+
+ {
+ val in = new DebugInputStream("aaa")
+ val (source, sink) = sourceSink()
+ val out = new DebugOutputStream
+ source connectIn in
+ sink connectOut out
+ val f = Future {
+ source.join()
+ sink.join()
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- in.closed: ${in.closed}")
+ println(s"- out.closed: ${out.closed}")
+ println(s"- source.isReleased: ${source.isReleased}")
+ println(s"- sink.isReleased: ${sink.isReleased}")
+ }
+
+ println("PipeSource and PipeSink must release resources when when waiting for source.take()")
+
+ {
+ val (source, sink) = sourceSink()
+ val out = new DebugOutputStream
+ sink connectOut out
+ val f = Future {
+ sink.ensureRunloopStarted()
+ source.release()
+ sink.release()
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- out.closed: ${out.closed}")
+ println(s"- source.isReleased: ${source.isReleased}")
+ println(s"- sink.isReleased: ${sink.isReleased}")
+ }
+
+ println("PipeSource and PipeSink must release resources when when waiting for sink.take()")
+
+ {
+ val in = new DebugInputStream("aaa")
+ val (source, sink) = sourceSink()
+ source connectIn in
+ val f = Future {
+ source.ensureRunloopStarted()
+ source.release()
+ sink.release()
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- in.closed: ${in.closed}")
+ println(s"- source.isReleased: ${source.isReleased}")
+ println(s"- sink.isReleased: ${sink.isReleased}")
+ }
+
+ println("PipeSource and PipeSink must release resources when copying stream")
+
+ {
+ val in = new DebugInfinityInputStream
+ val (source, sink) = sourceSink()
+ val out = new DebugOutputStream
+ source connectIn in
+ sink connectOut out
+ val f = Future {
+ source.ensureRunloopStarted()
+ sink.ensureRunloopStarted()
+ source.release()
+ sink.release()
+ }
+ Await.result(f, Duration(5, SECONDS))
+ println(s"- in.closed: ${in.closed}")
+ println(s"- out.closed: ${out.closed}")
+ println(s"- source.isReleased: ${source.isReleased}")
+ println(s"- sink.isReleased: ${sink.isReleased}")
+ }
+}
+
+package scala.sys.process {
+ import java.io._
+ import java.lang.reflect.InvocationTargetException
+
+ object debug {
+
+ class ProcessMock(error: Boolean) extends Process {
+ var destroyCount = 0
+ def exitValue(): Int = {
+ if (error) {
+ throw new InterruptedException()
+ }
+ 0
+ }
+ def destroy(): Unit = { destroyCount += 1 }
+ }
+
+ class ProcessBuilderMock(process: Process, error: Boolean) extends ProcessBuilder.AbstractBuilder {
+ override def run(io: ProcessIO): Process = {
+ if (error) {
+ throw new IOException()
+ }
+ process
+ }
+ }
+
+ class PipeSinkMock extends Process.PipeSink("PipeSinkMock") {
+ var releaseCount = 0
+ override val pipe = null
+ override val sink = null
+ override def run(): Unit = {}
+ override def connectOut(out: OutputStream): Unit = {}
+ override def connectIn(pipeOut: PipedOutputStream): Unit = {}
+ override def release(): Unit = { releaseCount += 1 }
+ }
+
+ class PipeSourceMock extends Process.PipeSource("PipeSourceMock") {
+ var releaseCount = 0
+ override val pipe = null
+ override val source = null
+ override def run(): Unit = {}
+ override def connectIn(in: InputStream): Unit = {}
+ override def connectOut(sink: Process.PipeSink): Unit = {}
+ override def release(): Unit = { releaseCount += 1 }
+ }
+
+ class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean)
+ extends Process.PipedProcesses(a, b, defaultIO, toError) {
+ def callRunAndExitValue(source: Process.PipeSource, sink: Process.PipeSink) = {
+ val m = classOf[Process.PipedProcesses].getDeclaredMethod("runAndExitValue", classOf[Process.PipeSource], classOf[Process.PipeSink])
+ m.setAccessible(true)
+ try m.invoke(this, source, sink).asInstanceOf[Option[Int]]
+ catch {
+ case err: InvocationTargetException => throw err.getTargetException
+ }
+ }
+ }
+
+ def throwsIOException(f: => Unit) = {
+ try { f; false }
+ catch { case _: IOException => true }
+ }
+
+ class PipeSink extends Process.PipeSink("TestPipeSink") {
+ def ensureRunloopStarted() = {
+ while (sink.size() > 0) {
+ Thread.sleep(1)
+ }
+ }
+ def isReleased = {
+ val field = classOf[Process.PipeSink].getDeclaredField("pipe")
+ field.setAccessible(true)
+ val pipe = field.get(this).asInstanceOf[PipedInputStream]
+ !this.isAlive && throwsIOException { pipe.read() }
+ }
+ }
+
+ class PipeSource extends Process.PipeSource("TestPipeSource") {
+ def ensureRunloopStarted() = {
+ while (source.size() > 0) {
+ Thread.sleep(1)
+ }
+ }
+ def isReleased = {
+ val field = classOf[Process.PipeSource].getDeclaredField("pipe")
+ field.setAccessible(true)
+ val pipe = field.get(this).asInstanceOf[PipedOutputStream]
+ !this.isAlive && throwsIOException { pipe.write(1) }
+ }
+ }
+ }
+} \ No newline at end of file