diff options
author | Heather Miller <heather.miller@epfl.ch> | 2012-08-05 12:43:02 +0200 |
---|---|---|
committer | Heather Miller <heather.miller@epfl.ch> | 2012-08-05 12:43:02 +0200 |
commit | 505cc713ec4546c5f9be1065e5fa3f7065451b8f (patch) | |
tree | 1a1b5606bdfdd52dd1be83e682114cb7bab19253 | |
parent | cb6066ee6136d78ceb8c3b5c06cefac998966dd0 (diff) | |
parent | 5b82a9702de3ffd5b131caf8c550877b476e8f9c (diff) | |
download | scala-505cc713ec4546c5f9be1065e5fa3f7065451b8f.tar.gz scala-505cc713ec4546c5f9be1065e5fa3f7065451b8f.tar.bz2 scala-505cc713ec4546c5f9be1065e5fa3f7065451b8f.zip |
Merge branch 'try-based-futures' of https://github.com/heathermiller/scala into try-based-futures
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 8 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 12 | ||||
-rw-r--r-- | test/files/jvm/scala-concurrent-tck.scala | 61 |
3 files changed, 78 insertions, 3 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 8081bb32da..ac462ac9d2 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -12,6 +12,7 @@ package scala.concurrent import java.util.concurrent.{ ExecutorService, Executor } import scala.concurrent.util.Duration import scala.annotation.implicitNotFound +import scala.util.Try /** * An `ExecutionContext` is an abstraction over an entity that can execute program logic. @@ -27,6 +28,13 @@ trait ExecutionContext { */ def reportFailure(t: Throwable): Unit + /** Prepares for the execution of callback `f`. Returns the prepared + * execution context which should be used to schedule the execution + * of the task associated with `f`. + */ + def prepare[T, U](f: Try[T] => U): ExecutionContext = + this + } /** diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index fab6b55c52..9f30529dc8 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -22,7 +22,9 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with sc def future: this.type = this } -private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Try[T]) => Any) extends Runnable with OnCompleteRunnable { +/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`. + */ +private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it var value: Try[T] = null @@ -34,6 +36,8 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete def executeWithValue(v: Try[T]): Unit = { require(value eq null) // can't complete it twice value = v + // Note that we cannot prepare the ExecutionContext at this point, since we might + // already be running on a different thread! executor.execute(this) } } @@ -125,7 +129,8 @@ private[concurrent] object Promise { } def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val runnable = new CallbackRunnable[T](executor, func) + val preparedEC = executor.prepare(func) + val runnable = new CallbackRunnable[T](preparedEC, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = @@ -151,7 +156,8 @@ private[concurrent] object Promise { def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get - (new CallbackRunnable(executor, func)).executeWithValue(completedAs) + val preparedEC = executor.prepare(func) + (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 36ab910593..976d98a337 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -926,6 +926,66 @@ trait CustomExecutionContext extends TestBase { testCallbackChainCustomEC() } +trait ExecutionContextPrepare extends TestBase { + val theLocal = new ThreadLocal[String] { + override protected def initialValue(): String = "" + } + + class PreparingExecutionContext extends ExecutionContext { + def delegate = ExecutionContext.global + + override def execute(runnable: Runnable): Unit = + delegate.execute(runnable) + + override def prepare[T, U](f: Try[T] => U): ExecutionContext = { + // save object stored in ThreadLocal storage + val localData = theLocal.get + new PreparingExecutionContext { + override def execute(runnable: Runnable): Unit = { + val wrapper = new Runnable { + override def run(): Unit = { + // now we're on the new thread + // put localData into theLocal + theLocal.set(localData) + runnable.run() + } + } + delegate.execute(wrapper) + } + } + } + + override def reportFailure(t: Throwable): Unit = + delegate.reportFailure(t) + } + + implicit val ec = new PreparingExecutionContext + + def testOnComplete(): Unit = once { + done => + theLocal.set("secret") + val fut = future { 42 } + fut onComplete { + case _ => + assert(theLocal.get == "secret") + done() + } + } + + def testMap(): Unit = once { + done => + theLocal.set("secret2") + val fut = future { 42 } + fut map { x => + assert(theLocal.get == "secret2") + done() + } + } + + testOnComplete() + testMap() +} + object Test extends App with FutureCallbacks @@ -935,6 +995,7 @@ with Promises with BlockContexts with Exceptions with CustomExecutionContext +with ExecutionContextPrepare { System.exit(0) } |