summaryrefslogblamecommitdiff
path: root/src/library/scala/sys/process/ProcessImpl.scala
blob: aebce5ac94c7c712b4f2fe247fbe8e44eaf26764 (plain) (tree)







































































































































































































































                                                                                                                                                     
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2010, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.sys
package process

import processAliases._
import java.io.{ FilterInputStream, FilterOutputStream, PipedInputStream, PipedOutputStream }
import java.util.concurrent.LinkedBlockingQueue

/** Runs provided code in a new Thread and returns the Thread instance. */
private object Spawn {
	def apply(f: => Unit): Thread = apply(f, false)
	def apply(f: => Unit, daemon: Boolean): Thread = {
		val thread = new Thread() { override def run() = { f } }
		thread.setDaemon(daemon)
		thread.start()
		thread
	}
}
private object Future {
	def apply[T](f: => T): () => T = {
		val result = new SyncVar[Either[Throwable, T]]
		def run: Unit =
			try result.set(Right(f))
			catch { case e: Exception => result set Left(e) }

		Spawn(run)

		() => result.get match {
			case Right(value)    => value
			case Left(exception) => throw exception
		}
	}
}
object Uncloseable {
	def apply(in: InputStream): InputStream      = new FilterInputStream(in) { override def close() { } }
	def apply(out: OutputStream): OutputStream   = new FilterOutputStream(out) { override def close() { } }
	def protect(in: InputStream): InputStream    = if (in eq System.in) Uncloseable(in) else in
	def protect(out: OutputStream): OutputStream = if ((out eq System.out) || (out eq System.err)) Uncloseable(out) else out
}

private class AndProcess(
  a: ProcessBuilder,
  b: ProcessBuilder,
  io: ProcessIO
) extends SequentialProcess(a, b, io, _ == 0)

private class OrProcess(
  a: ProcessBuilder,
  b: ProcessBuilder,
  io: ProcessIO
) extends SequentialProcess(a, b, io, _ != 0)

private class ProcessSequence(
  a: ProcessBuilder,
  b: ProcessBuilder,
  io: ProcessIO
) extends SequentialProcess(a, b, io, _ => true)

private class SequentialProcess(
  a: ProcessBuilder,
  b: ProcessBuilder,
  io: ProcessIO,
  evaluateSecondProcess: Int => Boolean
) extends CompoundProcess {

	protected[this] override def runAndExitValue() = {
		val first = a.run(io)
		runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA =>
		  if (evaluateSecondProcess(codeA)) {
		    val second = b.run(io)
				runInterruptible(second.exitValue)(second.destroy())
		  }
		  else Some(codeA)
		}
	}
}

private abstract class BasicProcess extends Process {
	def start(): Unit
}

private abstract class CompoundProcess extends BasicProcess {
	def destroy()   = destroyer()
	def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.")
	def start()     = getExitValue

	protected lazy val (getExitValue, destroyer) = {
		val code = new SyncVar[Option[Int]]()
		code set None
		val thread = Spawn(code.set(runAndExitValue()))

		(
			Future { thread.join(); code.get },
			() => thread.interrupt()
		)
	}

	/** Start and block until the exit value is available and then return it in Some.  Return None if destroyed (use 'run')*/
	protected[this] def runAndExitValue(): Option[Int]

	protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = {
		try Some(action)
		catch { case _: InterruptedException => destroyImpl; None }
	}
}

private class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess {
	protected[this] override def runAndExitValue() = {
		val currentSource = new SyncVar[Option[InputStream]]
		val pipeOut       = new PipedOutputStream
		val source        = new PipeSource(currentSource, pipeOut, a.toString)
		source.start()

		val pipeIn      = new PipedInputStream(pipeOut)
		val currentSink = new SyncVar[Option[OutputStream]]
		val sink        = new PipeSink(pipeIn, currentSink, b.toString)
		sink.start()

		def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput)

		val firstIO =
			if (toError)
				defaultIO.withError(handleOutOrError)
			else
				defaultIO.withOutput(handleOutOrError)
		val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput))

		val second = b.run(secondIO)
		val first = a.run(firstIO)
		try {
			runInterruptible {
				first.exitValue
				currentSource put None
				currentSink put None
				val result = second.exitValue
				result
			} {
				first.destroy()
				second.destroy()
			}
		}
		finally {
			BasicIO.close(pipeIn)
			BasicIO.close(pipeOut)
		}
	}
}
private class PipeSource(currentSource: SyncVar[Option[InputStream]], pipe: PipedOutputStream, label: => String) extends Thread {
	final override def run() {
		currentSource.get match {
			case Some(source) =>
				try BasicIO.transferFully(source, pipe)
				catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() }
				finally {
					BasicIO.close(source)
					currentSource.unset()
				}
				run()
			case None =>
				currentSource.unset()
				BasicIO.close(pipe)
		}
	}
}
private class PipeSink(pipe: PipedInputStream, currentSink: SyncVar[Option[OutputStream]], label: => String) extends Thread {
	final override def run() {
		currentSink.get match {
			case Some(sink) =>
				try BasicIO.transferFully(pipe, sink)
				catch { case e: IOException => println("I/O error " + e.getMessage + " for process: " + label); e.printStackTrace() }
				finally {
					BasicIO.close(sink)
					currentSink.unset()
				}
				run()
			case None =>
				currentSink.unset()
		}
	}
}
/** A thin wrapper around a java.lang.Process.  `ioThreads` are the Threads created to do I/O.
* The implementation of `exitValue` waits until these threads die before returning. */
private class DummyProcess(action: => Int) extends Process {
	private[this] val exitCode = Future(action)
	override def exitValue() = exitCode()
	override def destroy() { }
}
/** A thin wrapper around a java.lang.Process.  `outputThreads` are the Threads created to read from the
* output and error streams of the process.  `inputThread` is the Thread created to write to the input stream of
* the process.
* The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before
* returning. */
private class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process {
	override def exitValue() = {
		try p.waitFor()                 // wait for the process to terminate
		finally inputThread.interrupt() // we interrupt the input thread to notify it that it can terminate

		outputThreads.foreach(_.join()) // this ensures that all output is complete before returning (waitFor does not ensure this)
		p.exitValue()
	}
	override def destroy() = {
		try p.destroy()
		finally { inputThread.interrupt() }
	}
}
private final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process {
	override def exitValue() = {
		thread.join()
		if (success.get) 0 else 1
	}
	override def destroy() { thread.interrupt() }
}

private object Streamed {
	def apply[T](nonzeroException: Boolean): Streamed[T] = {
		val q = new LinkedBlockingQueue[Either[Int, T]]
		def next(): Stream[T] = q.take match {
			case Left(0)    => Stream.empty
			case Left(code) => if (nonzeroException) error("Nonzero exit code: " + code) else Stream.empty
			case Right(s)   => Stream.cons(s, next)
		}
		new Streamed((s: T) => q.put(Right(s)), code => q.put(Left(code)), () => next())
	}
}
private final class Streamed[T](val process: T => Unit, val done: Int => Unit, val stream: () => Stream[T]) extends NotNull