From a3d3a5352919439b0efeb493c6ab3fb8c6bf14b0 Mon Sep 17 00:00:00 2001 From: Havoc Pennington Date: Fri, 4 May 2012 10:36:58 -0400 Subject: rework Future.dispatchFuture a bit to fix bugs / optimize This fixes a bug where _taskStack could batch a task into the wrong executor, as previously commented in the code. It now uses the _taskStack machinery for the Future.apply dispatch in addition to callback dispatch, so we can batch Future(body) as well. Less significantly, it micro-optimizes by combining some different closures and Runnable into a Task object, so there aren't as many objects created when storing and dispatching a callback. So it saves a bit of memory and runtime perhaps. --- src/library/scala/concurrent/impl/Future.scala | 106 +++++++++++++++++------- src/library/scala/concurrent/impl/Promise.scala | 25 ++---- 2 files changed, 82 insertions(+), 49 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 47534e398b..6a3487adde 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -44,13 +44,13 @@ private[concurrent] object Future { } def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c - - def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { + + private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T) + extends Task { val promise = new Promise.DefaultPromise[T]() - //TODO: use `dispatchFuture`? - executor.execute(new Runnable { - def run = promise complete { + protected override def task() = { + promise complete { try Right(body) catch { case NonFatal(e) => // Commenting out reporting for now, since it produces too much output in the tests @@ -58,9 +58,14 @@ private[concurrent] object Future { Left(e) } } - }) - - promise.future + } + } + + def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { + val task = new PromiseCompletingTask(executor, body) + task.dispatch() + + task.promise.future } private[impl] val throwableId: Throwable => Throwable = identity _ @@ -70,38 +75,77 @@ private[concurrent] object Future { // so that it can be stolen from // OR: a push to the local task queue should be so cheap that this is // not even needed, but stealing is still possible - private val _taskStack = new ThreadLocal[Stack[() => Unit]]() + + private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext) + + private val _taskStack = new ThreadLocal[TaskStack]() + + private[impl] trait Task extends Runnable { + def executor: ExecutionContext + + // run the original callback (no dispatch) + protected def task(): Unit + + // we implement Runnable to avoid creating + // an extra object. run() runs ourselves with + // a TaskStack pushed, and then runs any + // other tasks that show up in the stack. + final override def run() = { + try { + val taskStack = TaskStack(Stack[Task](this), executor) + _taskStack set taskStack + while (taskStack.stack.nonEmpty) { + val next = taskStack.stack.pop() + require(next.executor eq executor) + try next.task() catch { case NonFatal(e) => executor reportFailure e } + } + } finally { + _taskStack.remove() + } + } + + // send the task to the running executor.execute() via + // _taskStack, or start a new executor.execute() + def dispatch(force: Boolean = false): Unit = + _taskStack.get match { + case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this + case _ => executor.execute(this) + } + } + + private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task]) + extends Task { + protected override def task() = { + _taskStack.get.stack.elems = elems + } + } private[impl] def releaseStack(executor: ExecutionContext): Unit = _taskStack.get match { - case stack if (stack ne null) && stack.nonEmpty => - val tasks = stack.elems - stack.clear() + case stack if (stack ne null) && stack.stack.nonEmpty => + val tasks = stack.stack.elems + stack.stack.clear() _taskStack.remove() - dispatchFuture(executor, () => _taskStack.get.elems = tasks, true) + val release = new ReleaseTask(executor, tasks) + release.dispatch(force=true) case null => // do nothing - there is no local batching stack anymore case _ => _taskStack.remove() } - private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit = - _taskStack.get match { - case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373 - case _ => executor.execute(new Runnable { - def run() { - try { - val taskStack = Stack[() => Unit](task) - _taskStack set taskStack - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try next() catch { case NonFatal(e) => executor reportFailure e } - } - } finally { - _taskStack.remove() - } - } - }) + private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) + extends Task { + private var value: Either[Throwable, T] = null + + protected override def task() = { + require(value ne null) // dispatch(value) must be called before dispatch() + onComplete(value) } - + + def dispatch(value: Either[Throwable, T]): Unit = { + this.value = value + dispatch() + } + } } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 1d573ef818..c5060a2368 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -94,10 +94,10 @@ object Promise { val resolved = resolveEither(value) (try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { + def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = { getState match { case raw: List[_] => - val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]] + val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]] if (updateState(cur, v)) cur else tryComplete(v) case _ => null } @@ -108,32 +108,21 @@ object Promise { }) match { case null => false case cs if cs.isEmpty => true - // this assumes that f(resolved) will go via dispatchFuture - // and notifyCompleted (see onComplete below) - case cs => cs.foreach(f => f(resolved)); true + case cs => cs.foreach(c => c.dispatch(resolved)); true } } def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { - val bound: Either[Throwable, T] => Unit = (either: Either[Throwable, T]) => - Future.dispatchFuture(executor, () => notifyCompleted(func, either)) + val bound = new Future.OnCompleteTask[T](executor, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { - case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]]) + case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]]) case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() } - - private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) { - try { - func(result) - } catch { - case NonFatal(e) => executor reportFailure e - } - } } /** An already completed Future is given its result at creation. @@ -149,8 +138,8 @@ object Promise { def tryComplete(value: Either[Throwable, T]): Boolean = false def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { - val completedAs = value.get // Avoid closing over "this" - Future.dispatchFuture(executor, () => func(completedAs)) + val completedAs = value.get + (new Future.OnCompleteTask(executor, func)).dispatch(completedAs) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this -- cgit v1.2.3