summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHavoc Pennington <hp@pobox.com>2012-05-04 10:36:58 -0400
committerHavoc Pennington <hp@pobox.com>2012-06-13 16:56:42 -0400
commita3d3a5352919439b0efeb493c6ab3fb8c6bf14b0 (patch)
treeb8f8b843a2432875ef7973f2d2d2a8a742aaac38 /src
parent1c2d466804b22f388ab4d66a034327539ac2e50f (diff)
downloadscala-a3d3a5352919439b0efeb493c6ab3fb8c6bf14b0.tar.gz
scala-a3d3a5352919439b0efeb493c6ab3fb8c6bf14b0.tar.bz2
scala-a3d3a5352919439b0efeb493c6ab3fb8c6bf14b0.zip
rework Future.dispatchFuture a bit to fix bugs / optimize
This fixes a bug where _taskStack could batch a task into the wrong executor, as previously commented in the code. It now uses the _taskStack machinery for the Future.apply dispatch in addition to callback dispatch, so we can batch Future(body) as well. Less significantly, it micro-optimizes by combining some different closures and Runnable into a Task object, so there aren't as many objects created when storing and dispatching a callback. So it saves a bit of memory and runtime perhaps.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/concurrent/impl/Future.scala106
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala25
2 files changed, 82 insertions, 49 deletions
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 47534e398b..6a3487adde 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -44,13 +44,13 @@ private[concurrent] object Future {
}
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
-
- def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+
+ private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T)
+ extends Task {
val promise = new Promise.DefaultPromise[T]()
- //TODO: use `dispatchFuture`?
- executor.execute(new Runnable {
- def run = promise complete {
+ protected override def task() = {
+ promise complete {
try Right(body) catch {
case NonFatal(e) =>
// Commenting out reporting for now, since it produces too much output in the tests
@@ -58,9 +58,14 @@ private[concurrent] object Future {
Left(e)
}
}
- })
-
- promise.future
+ }
+ }
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ val task = new PromiseCompletingTask(executor, body)
+ task.dispatch()
+
+ task.promise.future
}
private[impl] val throwableId: Throwable => Throwable = identity _
@@ -70,38 +75,77 @@ private[concurrent] object Future {
// so that it can be stolen from
// OR: a push to the local task queue should be so cheap that this is
// not even needed, but stealing is still possible
- private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
+
+ private val _taskStack = new ThreadLocal[TaskStack]()
+
+ private[impl] trait Task extends Runnable {
+ def executor: ExecutionContext
+
+ // run the original callback (no dispatch)
+ protected def task(): Unit
+
+ // we implement Runnable to avoid creating
+ // an extra object. run() runs ourselves with
+ // a TaskStack pushed, and then runs any
+ // other tasks that show up in the stack.
+ final override def run() = {
+ try {
+ val taskStack = TaskStack(Stack[Task](this), executor)
+ _taskStack set taskStack
+ while (taskStack.stack.nonEmpty) {
+ val next = taskStack.stack.pop()
+ require(next.executor eq executor)
+ try next.task() catch { case NonFatal(e) => executor reportFailure e }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+
+ // send the task to the running executor.execute() via
+ // _taskStack, or start a new executor.execute()
+ def dispatch(force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
+ case _ => executor.execute(this)
+ }
+ }
+
+ private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
+ extends Task {
+ protected override def task() = {
+ _taskStack.get.stack.elems = elems
+ }
+ }
private[impl] def releaseStack(executor: ExecutionContext): Unit =
_taskStack.get match {
- case stack if (stack ne null) && stack.nonEmpty =>
- val tasks = stack.elems
- stack.clear()
+ case stack if (stack ne null) && stack.stack.nonEmpty =>
+ val tasks = stack.stack.elems
+ stack.stack.clear()
_taskStack.remove()
- dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
+ val release = new ReleaseTask(executor, tasks)
+ release.dispatch(force=true)
case null =>
// do nothing - there is no local batching stack anymore
case _ =>
_taskStack.remove()
}
- private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373
- case _ => executor.execute(new Runnable {
- def run() {
- try {
- val taskStack = Stack[() => Unit](task)
- _taskStack set taskStack
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try next() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
- })
+ private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
+ extends Task {
+ private var value: Either[Throwable, T] = null
+
+ protected override def task() = {
+ require(value ne null) // dispatch(value) must be called before dispatch()
+ onComplete(value)
}
-
+
+ def dispatch(value: Either[Throwable, T]): Unit = {
+ this.value = value
+ dispatch()
+ }
+ }
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 1d573ef818..c5060a2368 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -94,10 +94,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
+ def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]]
+ val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -108,32 +108,21 @@ object Promise {
}) match {
case null => false
case cs if cs.isEmpty => true
- // this assumes that f(resolved) will go via dispatchFuture
- // and notifyCompleted (see onComplete below)
- case cs => cs.foreach(f => f(resolved)); true
+ case cs => cs.foreach(c => c.dispatch(resolved)); true
}
}
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val bound: Either[Throwable, T] => Unit = (either: Either[Throwable, T]) =>
- Future.dispatchFuture(executor, () => notifyCompleted(func, either))
+ val bound = new Future.OnCompleteTask[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]])
+ case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
-
- private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) {
- try {
- func(result)
- } catch {
- case NonFatal(e) => executor reportFailure e
- }
- }
}
/** An already completed Future is given its result at creation.
@@ -149,8 +138,8 @@ object Promise {
def tryComplete(value: Either[Throwable, T]): Boolean = false
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val completedAs = value.get // Avoid closing over "this"
- Future.dispatchFuture(executor, () => func(completedAs))
+ val completedAs = value.get
+ (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this