summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/ExecutionContext.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2011-12-05 14:33:42 +0100
committerPhilipp Haller <hallerp@gmail.com>2011-12-05 14:33:42 +0100
commitd38ca89f8d50c0d6eafac623f0edc165c01f8bd4 (patch)
tree5cae8a745eaeffde2f807da13af6971bdf80d130 /src/library/scala/concurrent/ExecutionContext.scala
parenta289465c70630719cbd3a74edf5502a156ef83c4 (diff)
downloadscala-d38ca89f8d50c0d6eafac623f0edc165c01f8bd4.tar.gz
scala-d38ca89f8d50c0d6eafac623f0edc165c01f8bd4.tar.bz2
scala-d38ca89f8d50c0d6eafac623f0edc165c01f8bd4.zip
Migration of scala.concurrent to clean fork of new Scala repo.
Diffstat (limited to 'src/library/scala/concurrent/ExecutionContext.scala')
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala117
1 files changed, 117 insertions, 0 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
new file mode 100644
index 0000000000..972a76a95a
--- /dev/null
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -0,0 +1,117 @@
+package scala.concurrent
+
+import java.util.concurrent.{ Executors, Future => JFuture }
+import scala.util.{ Duration, Timeout }
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+
+trait ExecutionContext {
+
+ protected implicit object CanBlockEvidence extends CanBlock
+
+ def execute(task: Runnable): Unit
+
+ def makeTask[T](task: () => T)(implicit timeout: Timeout): Task[T]
+
+ def makePromise[T](timeout: Timeout): Promise[T]
+
+ def blockingCall[T](body: Blockable[T]): T
+
+}
+
+trait Task[T] {
+
+ def start(): Unit
+ def future: Future[T]
+
+}
+
+/* DONE: The challenge is to make ForkJoinPromise inherit from RecursiveAction
+ * to avoid an object allocation per promise. This requires turning DefaultPromise
+ * into a trait, i.e., removing its constructor parameters.
+ */
+private[concurrent] class ForkJoinTaskImpl[T](context: ForkJoinExecutionContext, body: () => T, within: Timeout) extends FJTask[T] with Task[T] {
+
+ val timeout = within
+ implicit val dispatcher = context
+
+ // body of RecursiveTask
+ def compute(): T =
+ body()
+
+ def start(): Unit =
+ fork()
+
+ def future: Future[T] = {
+ null
+ }
+
+ // TODO FIXME: handle timeouts
+ def await(atMost: Duration): this.type =
+ await
+
+ def await: this.type = {
+ this.join()
+ this
+ }
+
+ def tryCancel(): Unit =
+ tryUnfork()
+}
+
+private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
+ val pool = new ForkJoinPool
+
+ @inline
+ private def executeForkJoinTask(task: RecursiveAction) {
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
+ task.fork()
+ else
+ pool execute task
+ }
+
+ def execute(task: Runnable) {
+ val action = new RecursiveAction { def compute() { task.run() } }
+ executeForkJoinTask(action)
+ }
+
+ def makeTask[T](body: () => T)(implicit timeout: Timeout): Task[T] = {
+ new ForkJoinTaskImpl(this, body, timeout)
+ }
+
+ def makePromise[T](timeout: Timeout): Promise[T] =
+ null
+
+ def blockingCall[T](body: Blockable[T]): T =
+ body.block()(CanBlockEvidence)
+
+}
+
+/**
+ * Implements a blocking execution context
+ */
+/*
+private[concurrent] class BlockingExecutionContext extends ExecutionContext {
+ //val pool = makeCachedThreadPool // TODO FIXME: need to merge thread pool factory methods from Heather's parcolls repo
+
+ def execute(task: Runnable) {
+ /* TODO
+ val p = newPromise(task.run())
+ p.start()
+ pool execute p
+ */
+ }
+
+ // TODO FIXME: implement
+ def newPromise[T](body: => T): Promise[T] = {
+ throw new Exception("not yet implemented")
+ }
+}
+*/
+
+object ExecutionContext {
+
+ lazy val forNonBlocking = new ForkJoinExecutionContext
+
+ //lazy val forBlocking = new BlockingExecutionContext
+
+}