From 70c3360d6f49f28216f28b7e25f61142481db9e6 Mon Sep 17 00:00:00 2001 From: rubyu Date: Thu, 7 Aug 2014 15:39:25 +0900 Subject: SI-7350 Prevent resource leaks in PipedProcesses.runAndExitValue() --- src/library/scala/sys/process/BasicIO.scala | 2 +- src/library/scala/sys/process/ProcessImpl.scala | 134 ++++++----- src/library/scala/sys/process/package.scala | 30 ++- test/files/run/t7350.check | 43 ++++ test/files/run/t7350.scala | 302 ++++++++++++++++++++++++ 5 files changed, 438 insertions(+), 73 deletions(-) create mode 100644 test/files/run/t7350.check create mode 100644 test/files/run/t7350.scala diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala index b31bbf0540..866dac4458 100644 --- a/src/library/scala/sys/process/BasicIO.scala +++ b/src/library/scala/sys/process/BasicIO.scala @@ -221,7 +221,7 @@ object BasicIO { */ def transferFully(in: InputStream, out: OutputStream): Unit = try transferFullyImpl(in, out) - catch onInterrupt(()) + catch onIOInterrupt(()) private[this] def appendLine(buffer: Appendable): String => Unit = line => { buffer append line diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index 2b7fcdeb73..d15f1a2b3d 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -109,45 +109,46 @@ private[process] trait ProcessImpl { } private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess { - protected[this] override def runAndExitValue() = { - val currentSource = new SyncVar[Option[InputStream]] - val pipeOut = new PipedOutputStream - val source = new PipeSource(currentSource, pipeOut, a.toString) + protected[this] override def runAndExitValue() = runAndExitValue(new PipeSource(a.toString), new PipeSink(b.toString)) + protected[this] def runAndExitValue(source: PipeSource, sink: PipeSink): Option[Int] = { + source connectOut sink source.start() - - val pipeIn = new PipedInputStream(pipeOut) - val currentSink = new SyncVar[Option[OutputStream]] - val sink = new PipeSink(pipeIn, currentSink, b.toString) sink.start() - def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput) + /** Release PipeSource, PipeSink and Process in the correct order. + * If once connect Process with Source or Sink, then the order of releasing them + * must be Source -> Sink -> Process, otherwise IOException will be thrown. */ + def releaseResources(so: PipeSource, sk: PipeSink, p: Process *) = { + so.release() + sk.release() + p foreach( _.destroy() ) + } val firstIO = - if (toError) - defaultIO.withError(handleOutOrError) - else - defaultIO.withOutput(handleOutOrError) - val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput)) - - val second = b.run(secondIO) - val first = a.run(firstIO) - try { - runInterruptible { - val exit1 = first.exitValue() - currentSource put None - currentSink put None - val exit2 = second.exitValue() - // Since file redirection (e.g. #>) is implemented as a piped process, - // we ignore its exit value so cmd #> file doesn't always return 0. - if (b.hasExitValue) exit2 else exit1 - } { - first.destroy() - second.destroy() + if (toError) defaultIO.withError(source.connectIn) + else defaultIO.withOutput(source.connectIn) + val secondIO = defaultIO.withInput(sink.connectOut) + + val second = + try b.run(secondIO) + catch onError { err => + releaseResources(source, sink) + throw err } - } - finally { - BasicIO close pipeIn - BasicIO close pipeOut + val first = + try a.run(firstIO) + catch onError { err => + releaseResources(source, sink, second) + throw err + } + runInterruptible { + val exit1 = first.exitValue() + val exit2 = second.exitValue() + // Since file redirection (e.g. #>) is implemented as a piped process, + // we ignore its exit value so cmd #> file doesn't always return 0. + if (b.hasExitValue) exit2 else exit1 + } { + releaseResources(source, sink, first, second) } } } @@ -168,37 +169,46 @@ private[process] trait ProcessImpl { } } - private[process] class PipeSource( - currentSource: SyncVar[Option[InputStream]], - pipe: PipedOutputStream, - label: => String - ) extends PipeThread(false, () => label) { - - final override def run(): Unit = currentSource.get match { - case Some(source) => - try runloop(source, pipe) - finally currentSource.unset() - - run() - case None => - currentSource.unset() - BasicIO close pipe + private[process] class PipeSource(label: => String) extends PipeThread(false, () => label) { + protected[this] val pipe = new PipedOutputStream + protected[this] val source = new LinkedBlockingQueue[Option[InputStream]] + override def run(): Unit = { + try { + source.take match { + case Some(in) => runloop(in, pipe) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectIn(in: InputStream): Unit = source add Some(in) + def connectOut(sink: PipeSink): Unit = sink connectIn pipe + def release(): Unit = { + interrupt() + source add None + join() } } - private[process] class PipeSink( - pipe: PipedInputStream, - currentSink: SyncVar[Option[OutputStream]], - label: => String - ) extends PipeThread(true, () => label) { - - final override def run(): Unit = currentSink.get match { - case Some(sink) => - try runloop(pipe, sink) - finally currentSink.unset() - - run() - case None => - currentSink.unset() + private[process] class PipeSink(label: => String) extends PipeThread(true, () => label) { + protected[this] val pipe = new PipedInputStream + protected[this] val sink = new LinkedBlockingQueue[Option[OutputStream]] + override def run(): Unit = { + try { + sink.take match { + case Some(out) => runloop(pipe, out) + case None => + } + } + catch onInterrupt(()) + finally BasicIO close pipe + } + def connectOut(out: OutputStream): Unit = sink add Some(out) + def connectIn(pipeOut: PipedOutputStream): Unit = pipe connect pipeOut + def release(): Unit = { + interrupt() + sink add None + join() } } diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala index 1340a6c415..91fa99e3df 100644 --- a/src/library/scala/sys/process/package.scala +++ b/src/library/scala/sys/process/package.scala @@ -224,16 +224,26 @@ package scala.sys { final val processDebug = props contains "scala.process.debug" dbg("Initializing process package.") - type =?>[-A, +B] = PartialFunction[A, B] - type Closeable = java.io.Closeable - type File = java.io.File - type IOException = java.io.IOException - type InputStream = java.io.InputStream - type JProcess = java.lang.Process - type JProcessBuilder = java.lang.ProcessBuilder - type OutputStream = java.io.OutputStream - type SyncVar[T] = scala.concurrent.SyncVar[T] - type URL = java.net.URL + type =?>[-A, +B] = PartialFunction[A, B] + type Closeable = java.io.Closeable + type File = java.io.File + type IOException = java.io.IOException + type InterruptedIOException = java.io.InterruptedIOException + type InputStream = java.io.InputStream + type JProcess = java.lang.Process + type JProcessBuilder = java.lang.ProcessBuilder + type LinkedBlockingQueue[T] = java.util.concurrent.LinkedBlockingQueue[T] + type OutputStream = java.io.OutputStream + type SyncVar[T] = scala.concurrent.SyncVar[T] + type URL = java.net.URL + + def onError[T](handler: Throwable => T): Throwable =?> T = { + case e @ _ => handler(e) + } + + def onIOInterrupt[T](handler: => T): Throwable =?> T = { + case _: InterruptedIOException => handler + } def onInterrupt[T](handler: => T): Throwable =?> T = { case _: InterruptedException => handler diff --git a/test/files/run/t7350.check b/test/files/run/t7350.check new file mode 100644 index 0000000000..5959e04400 --- /dev/null +++ b/test/files/run/t7350.check @@ -0,0 +1,43 @@ +PipedProcesses need not to release resources when it normally ends +- source.releaseCount: 0 +- sink.releaseCount: 0 +- a.destroyCount: 0 +- b.destroyCount: 0 +PipedProcesses must release resources when b.run() fails +- source.releaseCount: 1 +- sink.releaseCount: 1 +- a.destroyCount: 0 +- b.destroyCount: 0 +PipedProcesses must release resources when a.run() fails +- source.releaseCount: 1 +- sink.releaseCount: 1 +- a.destroyCount: 0 +- b.destroyCount: 1 +PipedProcesses must release resources when interrupted waiting for first.exitValue() +- source.releaseCount: 1 +- sink.releaseCount: 1 +- a.destroyCount: 1 +- b.destroyCount: 1 +PipedProcesses must release resources when interrupted waiting for second.exitValue() +- source.releaseCount: 1 +- sink.releaseCount: 1 +- a.destroyCount: 1 +- b.destroyCount: 1 +PipeSource and PipeSink must release resources when it normally ends +- in.closed: true +- out.closed: true +- source.isReleased: true +- sink.isReleased: true +PipeSource and PipeSink must release resources when when waiting for source.take() +- out.closed: true +- source.isReleased: true +- sink.isReleased: true +PipeSource and PipeSink must release resources when when waiting for sink.take() +- in.closed: true +- source.isReleased: true +- sink.isReleased: true +PipeSource and PipeSink must release resources when copying stream +- in.closed: true +- out.closed: true +- source.isReleased: true +- sink.isReleased: true diff --git a/test/files/run/t7350.scala b/test/files/run/t7350.scala new file mode 100644 index 0000000000..156d2f0719 --- /dev/null +++ b/test/files/run/t7350.scala @@ -0,0 +1,302 @@ + +import scala.sys.process.debug._ +import scala.sys.process.{ProcessLogger, BasicIO} +import java.io.{Closeable, ByteArrayInputStream, ByteArrayOutputStream, InputStream, IOException} +import scala.concurrent.{Future, Await} +import scala.concurrent.duration.{Duration, SECONDS} +import scala.util.control.Exception._ +import scala.concurrent.ExecutionContext.Implicits.global + +object Test extends App { + // Each test normally ends in a moment, but for a worst case, + // waits until 5 seconds and breaks off. + + // Tests for PipedProcesses + + println("PipedProcesses need not to release resources when it normally ends") + + { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + p.callRunAndExitValue(source, sink) + } + Await.result(f, Duration(5, SECONDS)) + println(s"- source.releaseCount: ${source.releaseCount}") + println(s"- sink.releaseCount: ${sink.releaseCount}") + println(s"- a.destroyCount: ${a.destroyCount}") + println(s"- b.destroyCount: ${b.destroyCount}") + } + + println("PipedProcesses must release resources when b.run() fails") + + { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = true), io, false) + val f = Future { + ignoring(classOf[IOException]) { + p.callRunAndExitValue(source, sink) + } + } + Await.result(f, Duration(5, SECONDS)) + println(s"- source.releaseCount: ${source.releaseCount}") + println(s"- sink.releaseCount: ${sink.releaseCount}") + println(s"- a.destroyCount: ${a.destroyCount}") + println(s"- b.destroyCount: ${b.destroyCount}") + } + + println("PipedProcesses must release resources when a.run() fails") + + { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = true), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + ignoring(classOf[IOException]) { + p.callRunAndExitValue(source, sink) + } + } + Await.result(f, Duration(5, SECONDS)) + println(s"- source.releaseCount: ${source.releaseCount}") + println(s"- sink.releaseCount: ${sink.releaseCount}") + println(s"- a.destroyCount: ${a.destroyCount}") + println(s"- b.destroyCount: ${b.destroyCount}") + } + + println("PipedProcesses must release resources when interrupted waiting for first.exitValue()") + + { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = true) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + p.callRunAndExitValue(source, sink) + } + Await.result(f, Duration(5, SECONDS)) + println(s"- source.releaseCount: ${source.releaseCount}") + println(s"- sink.releaseCount: ${sink.releaseCount}") + println(s"- a.destroyCount: ${a.destroyCount}") + println(s"- b.destroyCount: ${b.destroyCount}") + } + + println("PipedProcesses must release resources when interrupted waiting for second.exitValue()") + + { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = true) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + p.callRunAndExitValue(source, sink) + } + Await.result(f, Duration(5, SECONDS)) + println(s"- source.releaseCount: ${source.releaseCount}") + println(s"- sink.releaseCount: ${sink.releaseCount}") + println(s"- a.destroyCount: ${a.destroyCount}") + println(s"- b.destroyCount: ${b.destroyCount}") + } + + // Tests for PipeSource and PipeSink + + trait CloseChecking extends Closeable { + var closed = false + override def close() = closed = true + } + class DebugOutputStream extends ByteArrayOutputStream with CloseChecking + class DebugInputStream(s: String) extends ByteArrayInputStream(s.getBytes()) with CloseChecking + class DebugInfinityInputStream extends InputStream with CloseChecking { + def read() = 1 + } + + def sourceSink() = { + val source = new PipeSource + val sink = new PipeSink + source connectOut sink + source.start() + sink.start() + (source, sink) + } + + println("PipeSource and PipeSink must release resources when it normally ends") + + { + val in = new DebugInputStream("aaa") + val (source, sink) = sourceSink() + val out = new DebugOutputStream + source connectIn in + sink connectOut out + val f = Future { + source.join() + sink.join() + } + Await.result(f, Duration(5, SECONDS)) + println(s"- in.closed: ${in.closed}") + println(s"- out.closed: ${out.closed}") + println(s"- source.isReleased: ${source.isReleased}") + println(s"- sink.isReleased: ${sink.isReleased}") + } + + println("PipeSource and PipeSink must release resources when when waiting for source.take()") + + { + val (source, sink) = sourceSink() + val out = new DebugOutputStream + sink connectOut out + val f = Future { + sink.ensureRunloopStarted() + source.release() + sink.release() + } + Await.result(f, Duration(5, SECONDS)) + println(s"- out.closed: ${out.closed}") + println(s"- source.isReleased: ${source.isReleased}") + println(s"- sink.isReleased: ${sink.isReleased}") + } + + println("PipeSource and PipeSink must release resources when when waiting for sink.take()") + + { + val in = new DebugInputStream("aaa") + val (source, sink) = sourceSink() + source connectIn in + val f = Future { + source.ensureRunloopStarted() + source.release() + sink.release() + } + Await.result(f, Duration(5, SECONDS)) + println(s"- in.closed: ${in.closed}") + println(s"- source.isReleased: ${source.isReleased}") + println(s"- sink.isReleased: ${sink.isReleased}") + } + + println("PipeSource and PipeSink must release resources when copying stream") + + { + val in = new DebugInfinityInputStream + val (source, sink) = sourceSink() + val out = new DebugOutputStream + source connectIn in + sink connectOut out + val f = Future { + source.ensureRunloopStarted() + sink.ensureRunloopStarted() + source.release() + sink.release() + } + Await.result(f, Duration(5, SECONDS)) + println(s"- in.closed: ${in.closed}") + println(s"- out.closed: ${out.closed}") + println(s"- source.isReleased: ${source.isReleased}") + println(s"- sink.isReleased: ${sink.isReleased}") + } +} + +package scala.sys.process { + import java.io._ + import java.lang.reflect.InvocationTargetException + + object debug { + + class ProcessMock(error: Boolean) extends Process { + var destroyCount = 0 + def exitValue(): Int = { + if (error) { + throw new InterruptedException() + } + 0 + } + def destroy(): Unit = { destroyCount += 1 } + } + + class ProcessBuilderMock(process: Process, error: Boolean) extends ProcessBuilder.AbstractBuilder { + override def run(io: ProcessIO): Process = { + if (error) { + throw new IOException() + } + process + } + } + + class PipeSinkMock extends Process.PipeSink("PipeSinkMock") { + var releaseCount = 0 + override val pipe = null + override val sink = null + override def run(): Unit = {} + override def connectOut(out: OutputStream): Unit = {} + override def connectIn(pipeOut: PipedOutputStream): Unit = {} + override def release(): Unit = { releaseCount += 1 } + } + + class PipeSourceMock extends Process.PipeSource("PipeSourceMock") { + var releaseCount = 0 + override val pipe = null + override val source = null + override def run(): Unit = {} + override def connectIn(in: InputStream): Unit = {} + override def connectOut(sink: Process.PipeSink): Unit = {} + override def release(): Unit = { releaseCount += 1 } + } + + class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) + extends Process.PipedProcesses(a, b, defaultIO, toError) { + def callRunAndExitValue(source: Process.PipeSource, sink: Process.PipeSink) = { + val m = classOf[Process.PipedProcesses].getDeclaredMethod("runAndExitValue", classOf[Process.PipeSource], classOf[Process.PipeSink]) + m.setAccessible(true) + try m.invoke(this, source, sink).asInstanceOf[Option[Int]] + catch { + case err: InvocationTargetException => throw err.getTargetException + } + } + } + + def throwsIOException(f: => Unit) = { + try { f; false } + catch { case _: IOException => true } + } + + class PipeSink extends Process.PipeSink("TestPipeSink") { + def ensureRunloopStarted() = { + while (sink.size() > 0) { + Thread.sleep(1) + } + } + def isReleased = { + val field = classOf[Process.PipeSink].getDeclaredField("pipe") + field.setAccessible(true) + val pipe = field.get(this).asInstanceOf[PipedInputStream] + !this.isAlive && throwsIOException { pipe.read() } + } + } + + class PipeSource extends Process.PipeSource("TestPipeSource") { + def ensureRunloopStarted() = { + while (source.size() > 0) { + Thread.sleep(1) + } + } + def isReleased = { + val field = classOf[Process.PipeSource].getDeclaredField("pipe") + field.setAccessible(true) + val pipe = field.get(this).asInstanceOf[PipedOutputStream] + !this.isAlive && throwsIOException { pipe.write(1) } + } + } + } +} \ No newline at end of file -- cgit v1.2.3