diff options
author | phaller <hallerp@gmail.com> | 2012-05-22 13:52:18 +0200 |
---|---|---|
committer | phaller <hallerp@gmail.com> | 2012-05-24 18:10:44 +0200 |
commit | 1dfce90246f7d334e34d110afb8b1517180995fc (patch) | |
tree | df12cb3e8593622e88d46c4af7e5eb987c116e55 /src/library/scala/concurrent/impl | |
parent | e490b02476769310765a8d61da656b535d21c56e (diff) | |
download | scala-1dfce90246f7d334e34d110afb8b1517180995fc.tar.gz scala-1dfce90246f7d334e34d110afb8b1517180995fc.tar.bz2 scala-1dfce90246f7d334e34d110afb8b1517180995fc.zip |
Move implicit ExecutionContext to be determined by lexical scope
Port of a pull request originally submitted by @havocp.
- declare the invariant that all app callbacks have an
associated ExecutionContext provided at the place
the callback is passed to a method on Future
- always run callbacks in their associated EC
- since all callbacks have their own EC, Promise
does not need one
- "internal" callbacks don't need to defer execution either
since we know the ultimate app callback will do so,
therefore we can use an immediate executor for these
Diffstat (limited to 'src/library/scala/concurrent/impl')
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 2 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 21 |
2 files changed, 13 insertions, 10 deletions
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index a54e81bd05..47534e398b 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -17,8 +17,6 @@ import scala.collection.mutable.Stack private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - implicit def executor: ExecutionContext - } private[concurrent] object Future { diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 78053f5117..1d573ef818 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -42,7 +42,7 @@ object Promise { /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => + class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => updateState(null, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU protected final def tryAwait(atMost: Duration): Boolean = { @@ -108,21 +108,26 @@ object Promise { }) match { case null => false case cs if cs.isEmpty => true - case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, resolved))); true + // this assumes that f(resolved) will go via dispatchFuture + // and notifyCompleted (see onComplete below) + case cs => cs.foreach(f => f(resolved)); true } } - def onComplete[U](func: Either[Throwable, T] => U): Unit = { + def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { + val bound: Either[Throwable, T] => Unit = (either: Either[Throwable, T]) => + Future.dispatchFuture(executor, () => notifyCompleted(func, either)) + @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { - case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]])) - case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback() + case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]]) + case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() } - private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { + private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) { try { func(result) } catch { @@ -135,7 +140,7 @@ object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(resolveEither(suppliedValue)) @@ -143,7 +148,7 @@ object Promise { def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] => U): Unit = { + def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get // Avoid closing over "this" Future.dispatchFuture(executor, () => func(completedAs)) } |