From fbcfba212fff76272c509c6781ea2a2897d84bff Mon Sep 17 00:00:00 2001 From: Som Snytt Date: Wed, 26 Oct 2016 11:45:55 -0700 Subject: SI-10007 sys.process thread sync A previous change to replace `SyncVar.set` with `SyncVar.put` breaks things. This commit tweaks the thread synchronizing in `sys.process` to actually use `SyncVar` to sync and pass a var. Joining the thread about to exit is superfluous. A result is put exactly once, and consumers use non-destructive `get`. Note that as usual, avoid kicking off threads in a static context, since class loading cycles are somewhat dicier with 2.12 lambdas. In particular, REPL is a static context by default. SI-10007 Clarify deprecation message The message on `set` was self-fulfilling, as it didn't hint that `put` has different semantics. So explain why `put` helps avoid errors instead of creating them. SI-10007 Always set exit value Always put a value to exit code, defaulting to None. Also clean up around tuple change to unfortunately named Future.apply. Very hard to follow those types. Date command pollutes output, so tweak test. --- src/library/scala/concurrent/SyncVar.scala | 4 +- .../scala/sys/process/ProcessBuilderImpl.scala | 14 +- src/library/scala/sys/process/ProcessImpl.scala | 34 +-- .../junit/scala/sys/process/PipedProcessTest.scala | 299 +++++++++++++++++++++ test/junit/scala/sys/process/ProcessTest.scala | 25 ++ test/junit/scala/sys/process/t7350.scala | 298 -------------------- 6 files changed, 352 insertions(+), 322 deletions(-) create mode 100644 test/junit/scala/sys/process/PipedProcessTest.scala create mode 100644 test/junit/scala/sys/process/ProcessTest.scala delete mode 100644 test/junit/scala/sys/process/t7350.scala diff --git a/src/library/scala/concurrent/SyncVar.scala b/src/library/scala/concurrent/SyncVar.scala index 5fabf553bd..0e534a9b22 100644 --- a/src/library/scala/concurrent/SyncVar.scala +++ b/src/library/scala/concurrent/SyncVar.scala @@ -91,7 +91,7 @@ class SyncVar[A] { // [Heather] the reason why: it doesn't take into consideration // whether or not the SyncVar is already defined. So, set has been // deprecated in order to eventually be able to make "setting" private - @deprecated("use `put` instead, as `set` is potentially error-prone", "2.10.0") + @deprecated("use `put` to ensure a value cannot be overwritten without a corresponding `take`", "2.10.0") // NOTE: Used by SBT 0.13.0-M2 and below def set(x: A): Unit = setVal(x) @@ -111,7 +111,7 @@ class SyncVar[A] { // [Heather] the reason why: it doesn't take into consideration // whether or not the SyncVar is already defined. So, unset has been // deprecated in order to eventually be able to make "unsetting" private - @deprecated("use `take` instead, as `unset` is potentially error-prone", "2.10.0") + @deprecated("use `take` to ensure a value is never discarded", "2.10.0") // NOTE: Used by SBT 0.13.0-M2 and below def unset(): Unit = synchronized { isDefined = false diff --git a/src/library/scala/sys/process/ProcessBuilderImpl.scala b/src/library/scala/sys/process/ProcessBuilderImpl.scala index eef140c16a..0df2e648e0 100644 --- a/src/library/scala/sys/process/ProcessBuilderImpl.scala +++ b/src/library/scala/sys/process/ProcessBuilderImpl.scala @@ -53,12 +53,14 @@ private[process] trait ProcessBuilderImpl { override def run(io: ProcessIO): Process = { val success = new SyncVar[Boolean] - success put false - val t = Spawn({ - runImpl(io) - success.put(true) - }, io.daemonizeThreads) - + def go(): Unit = { + var ok = false + try { + runImpl(io) + ok = true + } finally success.put(ok) + } + val t = Spawn(go(), io.daemonizeThreads) new ThreadProcess(t, success) } } diff --git a/src/library/scala/sys/process/ProcessImpl.scala b/src/library/scala/sys/process/ProcessImpl.scala index 6da0dee056..8a0002b316 100644 --- a/src/library/scala/sys/process/ProcessImpl.scala +++ b/src/library/scala/sys/process/ProcessImpl.scala @@ -86,17 +86,20 @@ private[process] trait ProcessImpl { private[process] abstract class CompoundProcess extends BasicProcess { def isAlive() = processThread.isAlive() def destroy() = destroyer() - def exitValue() = getExitValue._2() getOrElse scala.sys.error("No exit code: process destroyed.") - def start() = getExitValue + def exitValue() = futureValue() getOrElse scala.sys.error("No exit code: process destroyed.") + def start() = { futureThread ;() } - protected lazy val (processThread, getExitValue, destroyer) = { + protected lazy val (processThread, (futureThread, futureValue), destroyer) = { val code = new SyncVar[Option[Int]]() - code.put(None) - val thread = Spawn(code.put(runAndExitValue())) + val thread = Spawn { + var value: Option[Int] = None + try value = runAndExitValue() + finally code.put(value) + } ( thread, - Future { thread.join(); code.get }, + Future(code.get), // thread.join() () => thread.interrupt() ) } @@ -215,13 +218,15 @@ private[process] trait ProcessImpl { } /** A thin wrapper around a java.lang.Process. `ioThreads` are the Threads created to do I/O. - * The implementation of `exitValue` waits until these threads die before returning. */ + * The implementation of `exitValue` waits until these threads die before returning. + */ private[process] class DummyProcess(action: => Int) extends Process { - private[this] val exitCode = Future(action) - override def isAlive() = exitCode._1.isAlive() - override def exitValue() = exitCode._2() + private[this] val (thread, value) = Future(action) + override def isAlive() = thread.isAlive() + override def exitValue() = value() override def destroy() { } } + /** A thin wrapper around a java.lang.Process. `outputThreads` are the Threads created to read from the * output and error streams of the process. `inputThread` is the Thread created to write to the input stream of * the process. @@ -245,11 +250,8 @@ private[process] trait ProcessImpl { } } private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process { - override def isAlive() = thread.isAlive() - override def exitValue() = { - thread.join() - if (success.get) 0 else 1 - } - override def destroy() { thread.interrupt() } + override def isAlive() = thread.isAlive() + override def exitValue() = if (success.get) 0 else 1 // thread.join() + override def destroy() = thread.interrupt() } } diff --git a/test/junit/scala/sys/process/PipedProcessTest.scala b/test/junit/scala/sys/process/PipedProcessTest.scala new file mode 100644 index 0000000000..53f053e9aa --- /dev/null +++ b/test/junit/scala/sys/process/PipedProcessTest.scala @@ -0,0 +1,299 @@ +package scala.sys.process + +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.junit.Test +import java.io.{InputStream, OutputStream, PipedInputStream, PipedOutputStream, ByteArrayInputStream, + ByteArrayOutputStream, IOException, Closeable} +import java.lang.reflect.InvocationTargetException +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.{Duration, SECONDS} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.control.Exception.ignoring + +// Each test normally ends in a moment, but for failure cases, waits until one second. +// SI-7350, SI-8768 + +@RunWith(classOf[JUnit4]) +class PipedProcessTest { + class ProcessMock(error: Boolean) extends Process { + var destroyCount = 0 + def isAlive() = false + def exitValue(): Int = { + if (error) { + throw new InterruptedException() + } + 0 + } + def destroy(): Unit = { destroyCount += 1 } + } + + class ProcessBuilderMock(process: Process, error: Boolean) extends ProcessBuilder.AbstractBuilder { + override def run(io: ProcessIO): Process = { + if (error) { + throw new IOException() + } + process + } + } + + class PipeSinkMock extends Process.PipeSink("PipeSinkMock") { + var releaseCount = 0 + override val pipe = null + override val sink = null + override def run(): Unit = {} + override def connectOut(out: OutputStream): Unit = {} + override def connectIn(pipeOut: PipedOutputStream): Unit = {} + override def release(): Unit = { releaseCount += 1 } + } + + class PipeSourceMock extends Process.PipeSource("PipeSourceMock") { + var releaseCount = 0 + override val pipe = null + override val source = null + override def run(): Unit = {} + override def connectIn(in: InputStream): Unit = {} + override def connectOut(sink: Process.PipeSink): Unit = {} + override def release(): Unit = { releaseCount += 1 } + } + + class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) + extends Process.PipedProcesses(a, b, defaultIO, toError) { + def callRunAndExitValue(source: Process.PipeSource, sink: Process.PipeSink) = { + val m = classOf[Process.PipedProcesses].getDeclaredMethod("runAndExitValue", classOf[Process.PipeSource], classOf[Process.PipeSink]) + m.setAccessible(true) + try m.invoke(this, source, sink).asInstanceOf[Option[Int]] + catch { + case err: InvocationTargetException => throw err.getTargetException + } + } + } + + // PipedProcesses need not to release resources when it normally end + @Test + def normallyEnd() { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + p.callRunAndExitValue(source, sink) + } + Await.result(f, Duration(1, SECONDS)) + assert(source.releaseCount == 0) + assert(sink.releaseCount == 0) + assert(a.destroyCount == 0) + assert(b.destroyCount == 0) + } + + // PipedProcesses must release resources when b.run() failed + @Test + def bFailed() { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = true), io, false) + val f = Future { + ignoring(classOf[IOException]) { + p.callRunAndExitValue(source, sink) + } + } + Await.result(f, Duration(1, SECONDS)) + assert(source.releaseCount == 1) + assert(sink.releaseCount == 1) + assert(a.destroyCount == 0) + assert(b.destroyCount == 0) + } + + // PipedProcesses must release resources when a.run() failed + @Test + def aFailed() { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = true), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + ignoring(classOf[IOException]) { + p.callRunAndExitValue(source, sink) + } + } + Await.result(f, Duration(1, SECONDS)) + assert(source.releaseCount == 1) + assert(sink.releaseCount == 1) + assert(a.destroyCount == 0) + assert(b.destroyCount == 1) + } + + // PipedProcesses must release resources when interrupted during waiting for first.exitValue() + @Test + def firstInterrupted() { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = true) + val b = new ProcessMock(error = false) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + p.callRunAndExitValue(source, sink) + } + Await.result(f, Duration(1, SECONDS)) + assert(source.releaseCount == 1) + assert(sink.releaseCount == 1) + assert(a.destroyCount == 1) + assert(b.destroyCount == 1) + } + + // PipedProcesses must release resources when interrupted during waiting for second.exitValue() + @Test + def secondInterrupted() { + val io = BasicIO(false, ProcessLogger(_ => ())) + val source = new PipeSourceMock + val sink = new PipeSinkMock + val a = new ProcessMock(error = false) + val b = new ProcessMock(error = true) + val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) + val f = Future { + p.callRunAndExitValue(source, sink) + } + Await.result(f, Duration(1, SECONDS)) + assert(source.releaseCount == 1) + assert(sink.releaseCount == 1) + assert(a.destroyCount == 1) + assert(b.destroyCount == 1) + } +} + +@RunWith(classOf[JUnit4]) +class PipeSourceSinkTest { + def throwsIOException(f: => Unit) = { + try { f; false } + catch { case _: IOException => true } + } + + class PipeSink extends Process.PipeSink("TestPipeSink") { + def ensureRunloopStarted() = { + while (sink.size() > 0) { + Thread.sleep(1) + } + } + def isReleased = { + val field = classOf[Process.PipeSink].getDeclaredField("pipe") + field.setAccessible(true) + val pipe = field.get(this).asInstanceOf[PipedInputStream] + !this.isAlive && throwsIOException { pipe.read() } + } + } + + class PipeSource extends Process.PipeSource("TestPipeSource") { + def ensureRunloopStarted() = { + while (source.size() > 0) { + Thread.sleep(1) + } + } + def isReleased = { + val field = classOf[Process.PipeSource].getDeclaredField("pipe") + field.setAccessible(true) + val pipe = field.get(this).asInstanceOf[PipedOutputStream] + !this.isAlive && throwsIOException { pipe.write(1) } + } + } + + trait CloseChecking extends Closeable { + var closed = false + override def close() = closed = true + } + class DebugOutputStream extends ByteArrayOutputStream with CloseChecking + class DebugInputStream(s: String) extends ByteArrayInputStream(s.getBytes()) with CloseChecking + class DebugInfinityInputStream extends InputStream with CloseChecking { + def read() = 1 + } + + def sourceSink() = { + val source = new PipeSource + val sink = new PipeSink + source connectOut sink + source.start() + sink.start() + (source, sink) + } + + // PipeSource and PipeSink must release resources when it normally end + @Test + def normallyEnd() { + val in = new DebugInputStream("aaa") + val (source, sink) = sourceSink() + val out = new DebugOutputStream + source connectIn in + sink connectOut out + val f = Future { + source.join() + sink.join() + } + Await.result(f, Duration(1, SECONDS)) + assert(in.closed == true) + assert(out.closed == true) + assert(source.isReleased == true) + assert(sink.isReleased == true) + } + + // PipeSource and PipeSink must release resources when interrupted during waiting for source.take() + @Test + def sourceInterrupted() { + val (source, sink) = sourceSink() + val out = new DebugOutputStream + sink connectOut out + val f = Future { + sink.ensureRunloopStarted() + source.release() + sink.release() + } + Await.result(f, Duration(1, SECONDS)) + assert(out.closed == true) + assert(source.isReleased == true) + assert(sink.isReleased == true) + } + + // PipeSource and PipeSink must release resources when interrupted during waiting for sink.take() + @Test + def sinkInterrupted() { + val in = new DebugInputStream("aaa") + val (source, sink) = sourceSink() + source connectIn in + val f = Future { + source.ensureRunloopStarted() + source.release() + sink.release() + } + Await.result(f, Duration(1, SECONDS)) + assert(in.closed == true) + assert(source.isReleased == true) + assert(sink.isReleased == true) + } + + // PipeSource and PipeSink must release resources when interrupted during copy streams" + @Test + def runloopInterrupted() { + val in = new DebugInfinityInputStream + val (source, sink) = sourceSink() + val out = new DebugOutputStream + source connectIn in + sink connectOut out + val f = Future { + source.ensureRunloopStarted() + sink.ensureRunloopStarted() + source.release() + sink.release() + } + Await.result(f, Duration(1, SECONDS)) + assert(in.closed == true) + assert(out.closed == true) + assert(source.isReleased == true) + assert(sink.isReleased == true) + } +} diff --git a/test/junit/scala/sys/process/ProcessTest.scala b/test/junit/scala/sys/process/ProcessTest.scala new file mode 100644 index 0000000000..f6d779c2c8 --- /dev/null +++ b/test/junit/scala/sys/process/ProcessTest.scala @@ -0,0 +1,25 @@ +package scala.sys.process + +import java.io.ByteArrayInputStream +// should test from outside the package to ensure implicits work +//import scala.sys.process._ +import scala.util.Properties._ + +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.junit.Test +import org.junit.Assert.assertEquals + +@RunWith(classOf[JUnit4]) +class ProcessTest { + private def testily(body: => Unit) = if (!isWin) body + @Test def t10007(): Unit = testily { + val res = ("cat" #< new ByteArrayInputStream("lol".getBytes)).!! + assertEquals("lol\n", res) + } + // test non-hanging + @Test def t10055(): Unit = testily { + val res = ("cat" #< ( () => -1 ) ).! + assertEquals(0, res) + } +} diff --git a/test/junit/scala/sys/process/t7350.scala b/test/junit/scala/sys/process/t7350.scala deleted file mode 100644 index 9fdcac8ccc..0000000000 --- a/test/junit/scala/sys/process/t7350.scala +++ /dev/null @@ -1,298 +0,0 @@ -package scala.sys.process - -import org.junit.runner.RunWith -import org.junit.runners.JUnit4 -import org.junit.Test -import java.io.{InputStream, OutputStream, PipedInputStream, PipedOutputStream, ByteArrayInputStream, - ByteArrayOutputStream, IOException, Closeable} -import java.lang.reflect.InvocationTargetException -import scala.concurrent.{Await, Future} -import scala.concurrent.duration.{Duration, SECONDS} -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.control.Exception.ignoring - -// Each test normally ends in a moment, but for failure cases, waits until one second. - -@RunWith(classOf[JUnit4]) -class PipedProcessTest { - class ProcessMock(error: Boolean) extends Process { - var destroyCount = 0 - def isAlive() = false - def exitValue(): Int = { - if (error) { - throw new InterruptedException() - } - 0 - } - def destroy(): Unit = { destroyCount += 1 } - } - - class ProcessBuilderMock(process: Process, error: Boolean) extends ProcessBuilder.AbstractBuilder { - override def run(io: ProcessIO): Process = { - if (error) { - throw new IOException() - } - process - } - } - - class PipeSinkMock extends Process.PipeSink("PipeSinkMock") { - var releaseCount = 0 - override val pipe = null - override val sink = null - override def run(): Unit = {} - override def connectOut(out: OutputStream): Unit = {} - override def connectIn(pipeOut: PipedOutputStream): Unit = {} - override def release(): Unit = { releaseCount += 1 } - } - - class PipeSourceMock extends Process.PipeSource("PipeSourceMock") { - var releaseCount = 0 - override val pipe = null - override val source = null - override def run(): Unit = {} - override def connectIn(in: InputStream): Unit = {} - override def connectOut(sink: Process.PipeSink): Unit = {} - override def release(): Unit = { releaseCount += 1 } - } - - class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) - extends Process.PipedProcesses(a, b, defaultIO, toError) { - def callRunAndExitValue(source: Process.PipeSource, sink: Process.PipeSink) = { - val m = classOf[Process.PipedProcesses].getDeclaredMethod("runAndExitValue", classOf[Process.PipeSource], classOf[Process.PipeSink]) - m.setAccessible(true) - try m.invoke(this, source, sink).asInstanceOf[Option[Int]] - catch { - case err: InvocationTargetException => throw err.getTargetException - } - } - } - - // PipedProcesses need not to release resources when it normally end - @Test - def normallyEnd() { - val io = BasicIO(false, ProcessLogger(_ => ())) - val source = new PipeSourceMock - val sink = new PipeSinkMock - val a = new ProcessMock(error = false) - val b = new ProcessMock(error = false) - val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) - val f = Future { - p.callRunAndExitValue(source, sink) - } - Await.result(f, Duration(1, SECONDS)) - assert(source.releaseCount == 0) - assert(sink.releaseCount == 0) - assert(a.destroyCount == 0) - assert(b.destroyCount == 0) - } - - // PipedProcesses must release resources when b.run() failed - @Test - def bFailed() { - val io = BasicIO(false, ProcessLogger(_ => ())) - val source = new PipeSourceMock - val sink = new PipeSinkMock - val a = new ProcessMock(error = false) - val b = new ProcessMock(error = false) - val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = true), io, false) - val f = Future { - ignoring(classOf[IOException]) { - p.callRunAndExitValue(source, sink) - } - } - Await.result(f, Duration(1, SECONDS)) - assert(source.releaseCount == 1) - assert(sink.releaseCount == 1) - assert(a.destroyCount == 0) - assert(b.destroyCount == 0) - } - - // PipedProcesses must release resources when a.run() failed - @Test - def aFailed() { - val io = BasicIO(false, ProcessLogger(_ => ())) - val source = new PipeSourceMock - val sink = new PipeSinkMock - val a = new ProcessMock(error = false) - val b = new ProcessMock(error = false) - val p = new PipedProcesses(new ProcessBuilderMock(a, error = true), new ProcessBuilderMock(b, error = false), io, false) - val f = Future { - ignoring(classOf[IOException]) { - p.callRunAndExitValue(source, sink) - } - } - Await.result(f, Duration(1, SECONDS)) - assert(source.releaseCount == 1) - assert(sink.releaseCount == 1) - assert(a.destroyCount == 0) - assert(b.destroyCount == 1) - } - - // PipedProcesses must release resources when interrupted during waiting for first.exitValue() - @Test - def firstInterrupted() { - val io = BasicIO(false, ProcessLogger(_ => ())) - val source = new PipeSourceMock - val sink = new PipeSinkMock - val a = new ProcessMock(error = true) - val b = new ProcessMock(error = false) - val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) - val f = Future { - p.callRunAndExitValue(source, sink) - } - Await.result(f, Duration(1, SECONDS)) - assert(source.releaseCount == 1) - assert(sink.releaseCount == 1) - assert(a.destroyCount == 1) - assert(b.destroyCount == 1) - } - - // PipedProcesses must release resources when interrupted during waiting for second.exitValue() - @Test - def secondInterrupted() { - val io = BasicIO(false, ProcessLogger(_ => ())) - val source = new PipeSourceMock - val sink = new PipeSinkMock - val a = new ProcessMock(error = false) - val b = new ProcessMock(error = true) - val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false) - val f = Future { - p.callRunAndExitValue(source, sink) - } - Await.result(f, Duration(1, SECONDS)) - assert(source.releaseCount == 1) - assert(sink.releaseCount == 1) - assert(a.destroyCount == 1) - assert(b.destroyCount == 1) - } -} - -@RunWith(classOf[JUnit4]) -class PipeSourceSinkTest { - def throwsIOException(f: => Unit) = { - try { f; false } - catch { case _: IOException => true } - } - - class PipeSink extends Process.PipeSink("TestPipeSink") { - def ensureRunloopStarted() = { - while (sink.size() > 0) { - Thread.sleep(1) - } - } - def isReleased = { - val field = classOf[Process.PipeSink].getDeclaredField("pipe") - field.setAccessible(true) - val pipe = field.get(this).asInstanceOf[PipedInputStream] - !this.isAlive && throwsIOException { pipe.read() } - } - } - - class PipeSource extends Process.PipeSource("TestPipeSource") { - def ensureRunloopStarted() = { - while (source.size() > 0) { - Thread.sleep(1) - } - } - def isReleased = { - val field = classOf[Process.PipeSource].getDeclaredField("pipe") - field.setAccessible(true) - val pipe = field.get(this).asInstanceOf[PipedOutputStream] - !this.isAlive && throwsIOException { pipe.write(1) } - } - } - - trait CloseChecking extends Closeable { - var closed = false - override def close() = closed = true - } - class DebugOutputStream extends ByteArrayOutputStream with CloseChecking - class DebugInputStream(s: String) extends ByteArrayInputStream(s.getBytes()) with CloseChecking - class DebugInfinityInputStream extends InputStream with CloseChecking { - def read() = 1 - } - - def sourceSink() = { - val source = new PipeSource - val sink = new PipeSink - source connectOut sink - source.start() - sink.start() - (source, sink) - } - - // PipeSource and PipeSink must release resources when it normally end - @Test - def normallyEnd() { - val in = new DebugInputStream("aaa") - val (source, sink) = sourceSink() - val out = new DebugOutputStream - source connectIn in - sink connectOut out - val f = Future { - source.join() - sink.join() - } - Await.result(f, Duration(1, SECONDS)) - assert(in.closed == true) - assert(out.closed == true) - assert(source.isReleased == true) - assert(sink.isReleased == true) - } - - // PipeSource and PipeSink must release resources when interrupted during waiting for source.take() - @Test - def sourceInterrupted() { - val (source, sink) = sourceSink() - val out = new DebugOutputStream - sink connectOut out - val f = Future { - sink.ensureRunloopStarted() - source.release() - sink.release() - } - Await.result(f, Duration(1, SECONDS)) - assert(out.closed == true) - assert(source.isReleased == true) - assert(sink.isReleased == true) - } - - // PipeSource and PipeSink must release resources when interrupted during waiting for sink.take() - @Test - def sinkInterrupted() { - val in = new DebugInputStream("aaa") - val (source, sink) = sourceSink() - source connectIn in - val f = Future { - source.ensureRunloopStarted() - source.release() - sink.release() - } - Await.result(f, Duration(1, SECONDS)) - assert(in.closed == true) - assert(source.isReleased == true) - assert(sink.isReleased == true) - } - - // PipeSource and PipeSink must release resources when interrupted during copy streams" - @Test - def runloopInterrupted() { - val in = new DebugInfinityInputStream - val (source, sink) = sourceSink() - val out = new DebugOutputStream - source connectIn in - sink connectOut out - val f = Future { - source.ensureRunloopStarted() - sink.ensureRunloopStarted() - source.release() - sink.release() - } - Await.result(f, Duration(1, SECONDS)) - assert(in.closed == true) - assert(out.closed == true) - assert(source.isReleased == true) - assert(sink.isReleased == true) - } -} -- cgit v1.2.3