summaryrefslogblamecommitdiff
path: root/test/junit/scala/sys/process/PipedProcessTest.scala
blob: 3f403dbe75863f23beae1a90763c29820a03a82c (plain) (tree)
1
2
3
4
5
6
7
8
9








                                                                                                     


                                                         
                                                                                 
                   
 






                                                      



                                                     
                         





























































                                                                                                                                          
                                          



















                                                                                                                            
                                          



















                                                                                                                            
                                          

















                                                                                                                             
                                          

















                                                                                                                             
                                          








































































                                                                                                 
                                          
















                                                                                                     
                                          















                                                                                                   
                                          




                                     
                                                                                        












                                         
                                          





                                     
package scala.sys.process

import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.Test
import java.io.{InputStream, OutputStream, PipedInputStream, PipedOutputStream, ByteArrayInputStream,
  ByteArrayOutputStream, IOException, Closeable}
import java.lang.reflect.InvocationTargetException
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.Exception.ignoring

// Each test normally ends in a moment, but for failure cases, waits two seconds.
// SI-7350, SI-8768

// one second wasn't always enough --
// https://github.com/scala/scala-dev/issues/313
object TestDuration {
  import scala.concurrent.duration.{Duration, SECONDS}
  val Standard = Duration(2, SECONDS)
}

@RunWith(classOf[JUnit4])
class PipedProcessTest {
  class ProcessMock(error: Boolean) extends Process {
    var destroyCount = 0
    def isAlive() = false
    def exitValue(): Int = {
      if (error) {
        throw new InterruptedException()
      }
      0
    }
    def destroy(): Unit = { destroyCount += 1 }
  }

  class ProcessBuilderMock(process: Process, error: Boolean) extends ProcessBuilder.AbstractBuilder {
    override def run(io: ProcessIO): Process = {
      if (error) {
        throw new IOException()
      }
      process
    }
  }

  class PipeSinkMock extends Process.PipeSink("PipeSinkMock") {
    var releaseCount = 0
    override val pipe = null
    override val sink = null
    override def run(): Unit = {}
    override def connectOut(out: OutputStream): Unit = {}
    override def connectIn(pipeOut: PipedOutputStream): Unit = {}
    override def release(): Unit = { releaseCount += 1 }
  }

  class PipeSourceMock extends Process.PipeSource("PipeSourceMock") {
    var releaseCount = 0
    override val pipe = null
    override val source = null
    override def run(): Unit = {}
    override def connectIn(in: InputStream): Unit = {}
    override def connectOut(sink: Process.PipeSink): Unit = {}
    override def release(): Unit = { releaseCount += 1 }
  }

  class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean)
    extends Process.PipedProcesses(a, b, defaultIO, toError) {
    def callRunAndExitValue(source: Process.PipeSource, sink: Process.PipeSink) = {
      val m = classOf[Process.PipedProcesses].getDeclaredMethod("runAndExitValue", classOf[Process.PipeSource], classOf[Process.PipeSink])
      m.setAccessible(true)
      try m.invoke(this, source, sink).asInstanceOf[Option[Int]]
      catch {
        case err: InvocationTargetException => throw err.getTargetException
      }
    }
  }

  // PipedProcesses need not to release resources when it normally end
  @Test
  def normallyEnd() {
    val io = BasicIO(false, ProcessLogger(_ => ()))
    val source = new PipeSourceMock
    val sink = new PipeSinkMock
    val a = new ProcessMock(error = false)
    val b = new ProcessMock(error = false)
    val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false)
    val f = Future {
      p.callRunAndExitValue(source, sink)
    }
    Await.result(f, TestDuration.Standard)
    assert(source.releaseCount == 0)
    assert(sink.releaseCount == 0)
    assert(a.destroyCount == 0)
    assert(b.destroyCount == 0)
  }

  // PipedProcesses must release resources when b.run() failed
  @Test
  def bFailed() {
    val io = BasicIO(false, ProcessLogger(_ => ()))
    val source = new PipeSourceMock
    val sink = new PipeSinkMock
    val a = new ProcessMock(error = false)
    val b = new ProcessMock(error = false)
    val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = true), io, false)
    val f = Future {
      ignoring(classOf[IOException]) {
        p.callRunAndExitValue(source, sink)
      }
    }
    Await.result(f, TestDuration.Standard)
    assert(source.releaseCount == 1)
    assert(sink.releaseCount == 1)
    assert(a.destroyCount == 0)
    assert(b.destroyCount == 0)
  }

  // PipedProcesses must release resources when a.run() failed
  @Test
  def aFailed() {
    val io = BasicIO(false, ProcessLogger(_ => ()))
    val source = new PipeSourceMock
    val sink = new PipeSinkMock
    val a = new ProcessMock(error = false)
    val b = new ProcessMock(error = false)
    val p = new PipedProcesses(new ProcessBuilderMock(a, error = true), new ProcessBuilderMock(b, error = false), io, false)
    val f = Future {
      ignoring(classOf[IOException]) {
        p.callRunAndExitValue(source, sink)
      }
    }
    Await.result(f, TestDuration.Standard)
    assert(source.releaseCount == 1)
    assert(sink.releaseCount == 1)
    assert(a.destroyCount == 0)
    assert(b.destroyCount == 1)
  }

  // PipedProcesses must release resources when interrupted during waiting for first.exitValue()
  @Test
  def firstInterrupted() {
    val io = BasicIO(false, ProcessLogger(_ => ()))
    val source = new PipeSourceMock
    val sink = new PipeSinkMock
    val a = new ProcessMock(error = true)
    val b = new ProcessMock(error = false)
    val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false)
    val f = Future {
      p.callRunAndExitValue(source, sink)
    }
    Await.result(f, TestDuration.Standard)
    assert(source.releaseCount == 1)
    assert(sink.releaseCount == 1)
    assert(a.destroyCount == 1)
    assert(b.destroyCount == 1)
  }

  // PipedProcesses must release resources when interrupted during waiting for second.exitValue()
  @Test
  def secondInterrupted() {
    val io = BasicIO(false, ProcessLogger(_ => ()))
    val source = new PipeSourceMock
    val sink = new PipeSinkMock
    val a = new ProcessMock(error = false)
    val b = new ProcessMock(error = true)
    val p = new PipedProcesses(new ProcessBuilderMock(a, error = false), new ProcessBuilderMock(b, error = false), io, false)
    val f = Future {
      p.callRunAndExitValue(source, sink)
    }
    Await.result(f, TestDuration.Standard)
    assert(source.releaseCount == 1)
    assert(sink.releaseCount == 1)
    assert(a.destroyCount == 1)
    assert(b.destroyCount == 1)
  }
}

@RunWith(classOf[JUnit4])
class PipeSourceSinkTest {
  def throwsIOException(f: => Unit) = {
    try { f; false }
    catch { case _: IOException => true }
  }

  class PipeSink extends Process.PipeSink("TestPipeSink") {
    def ensureRunloopStarted() = {
      while (sink.size() > 0) {
        Thread.sleep(1)
      }
    }
    def isReleased = {
      val field = classOf[Process.PipeSink].getDeclaredField("pipe")
      field.setAccessible(true)
      val pipe = field.get(this).asInstanceOf[PipedInputStream]
      !this.isAlive && throwsIOException { pipe.read() }
    }
  }

  class PipeSource extends Process.PipeSource("TestPipeSource") {
    def ensureRunloopStarted() = {
      while (source.size() > 0) {
        Thread.sleep(1)
      }
    }
    def isReleased = {
      val field = classOf[Process.PipeSource].getDeclaredField("pipe")
      field.setAccessible(true)
      val pipe = field.get(this).asInstanceOf[PipedOutputStream]
      !this.isAlive && throwsIOException { pipe.write(1) }
    }
  }

  trait CloseChecking extends Closeable {
    var closed = false
    override def close() = closed = true
  }
  class DebugOutputStream extends ByteArrayOutputStream with CloseChecking
  class DebugInputStream(s: String) extends ByteArrayInputStream(s.getBytes()) with CloseChecking
  class DebugInfinityInputStream extends InputStream with CloseChecking {
    def read() = 1
  }

  def sourceSink() = {
    val source = new PipeSource
    val sink = new PipeSink
    source connectOut sink
    source.start()
    sink.start()
    (source, sink)
  }

  // PipeSource and PipeSink must release resources when it normally end
  @Test
  def normallyEnd() {
    val in = new DebugInputStream("aaa")
    val (source, sink) = sourceSink()
    val out = new DebugOutputStream
    source connectIn in
    sink connectOut out
    val f = Future {
      source.join()
      sink.join()
    }
    Await.result(f, TestDuration.Standard)
    assert(in.closed == true)
    assert(out.closed == true)
    assert(source.isReleased == true)
    assert(sink.isReleased == true)
  }

  // PipeSource and PipeSink must release resources when interrupted during waiting for source.take()
  @Test
  def sourceInterrupted() {
    val (source, sink) = sourceSink()
    val out = new DebugOutputStream
    sink connectOut out
    val f = Future {
      sink.ensureRunloopStarted()
      source.release()
      sink.release()
    }
    Await.result(f, TestDuration.Standard)
    assert(out.closed == true)
    assert(source.isReleased == true)
    assert(sink.isReleased == true)
  }

  // PipeSource and PipeSink must release resources when interrupted during waiting for sink.take()
  @Test
  def sinkInterrupted() {
    val in = new DebugInputStream("aaa")
    val (source, sink) = sourceSink()
    source connectIn in
    val f = Future {
      source.ensureRunloopStarted()
      source.release()
      sink.release()
    }
    Await.result(f, TestDuration.Standard)
    assert(in.closed == true)
    assert(source.isReleased == true)
    assert(sink.isReleased == true)
  }

  // PipeSource and PipeSink must release resources when interrupted during copy streams
  @Test
  def runloopInterrupted() {
    val in = new DebugInfinityInputStream
    val (source, sink) = sourceSink()
    val out = new DebugOutputStream
    source connectIn in
    sink connectOut out
    val f = Future {
      source.ensureRunloopStarted()
      sink.ensureRunloopStarted()
      source.release()
      sink.release()
    }
    Await.result(f, TestDuration.Standard)
    assert(in.closed == true)
    assert(out.closed == true)
    assert(source.isReleased == true)
    assert(sink.isReleased == true)
  }
}