diff options
author | Philipp Haller <hallerp@gmail.com> | 2011-12-05 14:33:42 +0100 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2011-12-05 14:33:42 +0100 |
commit | d38ca89f8d50c0d6eafac623f0edc165c01f8bd4 (patch) | |
tree | 5cae8a745eaeffde2f807da13af6971bdf80d130 /src/library/scala/concurrent/ExecutionContext.scala | |
parent | a289465c70630719cbd3a74edf5502a156ef83c4 (diff) | |
download | scala-d38ca89f8d50c0d6eafac623f0edc165c01f8bd4.tar.gz scala-d38ca89f8d50c0d6eafac623f0edc165c01f8bd4.tar.bz2 scala-d38ca89f8d50c0d6eafac623f0edc165c01f8bd4.zip |
Migration of scala.concurrent to clean fork of new Scala repo.
Diffstat (limited to 'src/library/scala/concurrent/ExecutionContext.scala')
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala new file mode 100644 index 0000000000..972a76a95a --- /dev/null +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -0,0 +1,117 @@ +package scala.concurrent + +import java.util.concurrent.{ Executors, Future => JFuture } +import scala.util.{ Duration, Timeout } +import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } + +trait ExecutionContext { + + protected implicit object CanBlockEvidence extends CanBlock + + def execute(task: Runnable): Unit + + def makeTask[T](task: () => T)(implicit timeout: Timeout): Task[T] + + def makePromise[T](timeout: Timeout): Promise[T] + + def blockingCall[T](body: Blockable[T]): T + +} + +trait Task[T] { + + def start(): Unit + def future: Future[T] + +} + +/* DONE: The challenge is to make ForkJoinPromise inherit from RecursiveAction + * to avoid an object allocation per promise. This requires turning DefaultPromise + * into a trait, i.e., removing its constructor parameters. + */ +private[concurrent] class ForkJoinTaskImpl[T](context: ForkJoinExecutionContext, body: () => T, within: Timeout) extends FJTask[T] with Task[T] { + + val timeout = within + implicit val dispatcher = context + + // body of RecursiveTask + def compute(): T = + body() + + def start(): Unit = + fork() + + def future: Future[T] = { + null + } + + // TODO FIXME: handle timeouts + def await(atMost: Duration): this.type = + await + + def await: this.type = { + this.join() + this + } + + def tryCancel(): Unit = + tryUnfork() +} + +private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext { + val pool = new ForkJoinPool + + @inline + private def executeForkJoinTask(task: RecursiveAction) { + if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) + task.fork() + else + pool execute task + } + + def execute(task: Runnable) { + val action = new RecursiveAction { def compute() { task.run() } } + executeForkJoinTask(action) + } + + def makeTask[T](body: () => T)(implicit timeout: Timeout): Task[T] = { + new ForkJoinTaskImpl(this, body, timeout) + } + + def makePromise[T](timeout: Timeout): Promise[T] = + null + + def blockingCall[T](body: Blockable[T]): T = + body.block()(CanBlockEvidence) + +} + +/** + * Implements a blocking execution context + */ +/* +private[concurrent] class BlockingExecutionContext extends ExecutionContext { + //val pool = makeCachedThreadPool // TODO FIXME: need to merge thread pool factory methods from Heather's parcolls repo + + def execute(task: Runnable) { + /* TODO + val p = newPromise(task.run()) + p.start() + pool execute p + */ + } + + // TODO FIXME: implement + def newPromise[T](body: => T): Promise[T] = { + throw new Exception("not yet implemented") + } +} +*/ + +object ExecutionContext { + + lazy val forNonBlocking = new ForkJoinExecutionContext + + //lazy val forBlocking = new BlockingExecutionContext + +} |