summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl
diff options
context:
space:
mode:
authorphaller <hallerp@gmail.com>2012-05-22 13:52:18 +0200
committerphaller <hallerp@gmail.com>2012-05-24 18:10:44 +0200
commit1dfce90246f7d334e34d110afb8b1517180995fc (patch)
treedf12cb3e8593622e88d46c4af7e5eb987c116e55 /src/library/scala/concurrent/impl
parente490b02476769310765a8d61da656b535d21c56e (diff)
downloadscala-1dfce90246f7d334e34d110afb8b1517180995fc.tar.gz
scala-1dfce90246f7d334e34d110afb8b1517180995fc.tar.bz2
scala-1dfce90246f7d334e34d110afb8b1517180995fc.zip
Move implicit ExecutionContext to be determined by lexical scope
Port of a pull request originally submitted by @havocp. - declare the invariant that all app callbacks have an associated ExecutionContext provided at the place the callback is passed to a method on Future - always run callbacks in their associated EC - since all callbacks have their own EC, Promise does not need one - "internal" callbacks don't need to defer execution either since we know the ultimate app callback will do so, therefore we can use an immediate executor for these
Diffstat (limited to 'src/library/scala/concurrent/impl')
-rw-r--r--src/library/scala/concurrent/impl/Future.scala2
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala21
2 files changed, 13 insertions, 10 deletions
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index a54e81bd05..47534e398b 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -17,8 +17,6 @@ import scala.collection.mutable.Stack
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContext
-
}
private[concurrent] object Future {
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 78053f5117..1d573ef818 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -42,7 +42,7 @@ object Promise {
/** Default promise implementation.
*/
- class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self =>
+ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
updateState(null, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU
protected final def tryAwait(atMost: Duration): Boolean = {
@@ -108,21 +108,26 @@ object Promise {
}) match {
case null => false
case cs if cs.isEmpty => true
- case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, resolved))); true
+ // this assumes that f(resolved) will go via dispatchFuture
+ // and notifyCompleted (see onComplete below)
+ case cs => cs.foreach(f => f(resolved)); true
}
}
- def onComplete[U](func: Either[Throwable, T] => U): Unit = {
+ def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
+ val bound: Either[Throwable, T] => Unit = (either: Either[Throwable, T]) =>
+ Future.dispatchFuture(executor, () => notifyCompleted(func, either))
+
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]]))
- case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback()
+ case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]])
+ case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
- private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
+ private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) {
try {
func(result)
} catch {
@@ -135,7 +140,7 @@ object Promise {
*
* Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
val value = Some(resolveEither(suppliedValue))
@@ -143,7 +148,7 @@ object Promise {
def tryComplete(value: Either[Throwable, T]): Boolean = false
- def onComplete[U](func: Either[Throwable, T] => U): Unit = {
+ def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get // Avoid closing over "this"
Future.dispatchFuture(executor, () => func(completedAs))
}