summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorphaller <hallerp@gmail.com>2012-08-05 00:20:01 +0200
committerphaller <hallerp@gmail.com>2012-08-05 00:20:01 +0200
commit5b82a9702de3ffd5b131caf8c550877b476e8f9c (patch)
tree86c8e93115815b8b6aa8b2322d0dc33cb26bfcd5
parent3cb0e784a05db7d0b542cec9bf4c5fbf3772a6cf (diff)
downloadscala-5b82a9702de3ffd5b131caf8c550877b476e8f9c.tar.gz
scala-5b82a9702de3ffd5b131caf8c550877b476e8f9c.tar.bz2
scala-5b82a9702de3ffd5b131caf8c550877b476e8f9c.zip
SI-6185 - add "prepare" hook to ExecutionContext
Enables important abstractions to be built on top of futures, such as Twitter's "Local" for handling data local to a callback chain.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala8
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala12
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala61
3 files changed, 78 insertions, 3 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 8081bb32da..ac462ac9d2 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -12,6 +12,7 @@ package scala.concurrent
import java.util.concurrent.{ ExecutorService, Executor }
import scala.concurrent.util.Duration
import scala.annotation.implicitNotFound
+import scala.util.Try
/**
* An `ExecutionContext` is an abstraction over an entity that can execute program logic.
@@ -27,6 +28,13 @@ trait ExecutionContext {
*/
def reportFailure(t: Throwable): Unit
+ /** Prepares for the execution of callback `f`. Returns the prepared
+ * execution context which should be used to schedule the execution
+ * of the task associated with `f`.
+ */
+ def prepare[T, U](f: Try[T] => U): ExecutionContext =
+ this
+
}
/**
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index fab6b55c52..9f30529dc8 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -22,7 +22,9 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with sc
def future: this.type = this
}
-private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Try[T]) => Any) extends Runnable with OnCompleteRunnable {
+/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`.
+ */
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable {
// must be filled in before running it
var value: Try[T] = null
@@ -34,6 +36,8 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete
def executeWithValue(v: Try[T]): Unit = {
require(value eq null) // can't complete it twice
value = v
+ // Note that we cannot prepare the ExecutionContext at this point, since we might
+ // already be running on a different thread!
executor.execute(this)
}
}
@@ -125,7 +129,8 @@ private[concurrent] object Promise {
}
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
- val runnable = new CallbackRunnable[T](executor, func)
+ val preparedEC = executor.prepare(func)
+ val runnable = new CallbackRunnable[T](preparedEC, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
@@ -151,7 +156,8 @@ private[concurrent] object Promise {
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
- (new CallbackRunnable(executor, func)).executeWithValue(completedAs)
+ val preparedEC = executor.prepare(func)
+ (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 36ab910593..976d98a337 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -926,6 +926,66 @@ trait CustomExecutionContext extends TestBase {
testCallbackChainCustomEC()
}
+trait ExecutionContextPrepare extends TestBase {
+ val theLocal = new ThreadLocal[String] {
+ override protected def initialValue(): String = ""
+ }
+
+ class PreparingExecutionContext extends ExecutionContext {
+ def delegate = ExecutionContext.global
+
+ override def execute(runnable: Runnable): Unit =
+ delegate.execute(runnable)
+
+ override def prepare[T, U](f: Try[T] => U): ExecutionContext = {
+ // save object stored in ThreadLocal storage
+ val localData = theLocal.get
+ new PreparingExecutionContext {
+ override def execute(runnable: Runnable): Unit = {
+ val wrapper = new Runnable {
+ override def run(): Unit = {
+ // now we're on the new thread
+ // put localData into theLocal
+ theLocal.set(localData)
+ runnable.run()
+ }
+ }
+ delegate.execute(wrapper)
+ }
+ }
+ }
+
+ override def reportFailure(t: Throwable): Unit =
+ delegate.reportFailure(t)
+ }
+
+ implicit val ec = new PreparingExecutionContext
+
+ def testOnComplete(): Unit = once {
+ done =>
+ theLocal.set("secret")
+ val fut = future { 42 }
+ fut onComplete {
+ case _ =>
+ assert(theLocal.get == "secret")
+ done()
+ }
+ }
+
+ def testMap(): Unit = once {
+ done =>
+ theLocal.set("secret2")
+ val fut = future { 42 }
+ fut map { x =>
+ assert(theLocal.get == "secret2")
+ done()
+ }
+ }
+
+ testOnComplete()
+ testMap()
+}
+
object Test
extends App
with FutureCallbacks
@@ -935,6 +995,7 @@ with Promises
with BlockContexts
with Exceptions
with CustomExecutionContext
+with ExecutionContextPrepare
{
System.exit(0)
}