diff options
Diffstat (limited to 'src/library/scala/concurrent/impl/Future.scala')
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 74 |
1 files changed, 70 insertions, 4 deletions
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index b4385ea34a..6833b2467f 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -8,13 +8,17 @@ package scala.concurrent.impl + + import scala.concurrent.{Awaitable, ExecutionContext} import scala.util.{ Try, Success, Failure } -//import scala.util.continuations._ +import scala.collection.mutable.Stack + + private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - implicit def executor: ExecutionContextImpl + implicit def executor: ExecutionContext /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. * @@ -40,7 +44,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa * that conforms to A's erased type or a ClassCastException otherwise. */ final def mapTo[T](implicit m: Manifest[T]) = { - val p = executor.promise[T] + val p = new Promise.DefaultPromise[T] onComplete { case f @ Failure(t) => p complete f.asInstanceOf[Try[T]] @@ -48,7 +52,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa p complete (try { Success(Future.boxedType(m.erasure).cast(v).asInstanceOf[T]) } catch { - case e: ClassCastException ⇒ Failure(e) + case e: ClassCastException => Failure(e) }) } @@ -86,4 +90,66 @@ object Future { def boxedType(c: Class[_]): Class[_] = { if (c.isPrimitive) toBoxed(c) else c } + + def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { + val promise = new Promise.DefaultPromise[T]() + executor.execute(new Runnable { + def run = { + promise complete { + try { + Success(body) + } catch { + case e => scala.concurrent.resolver(e) + } + } + } + }) + promise.future + } + + // an optimization for batching futures + // TODO we should replace this with a public queue, + // so that it can be stolen from + // OR: a push to the local task queue should be so cheap that this is + // not even needed, but stealing is still possible + private val _taskStack = new ThreadLocal[Stack[() => Unit]]() + + private[impl] def releaseStack(executor: ExecutionContext): Unit = + _taskStack.get match { + case stack if (stack ne null) && stack.nonEmpty => + val tasks = stack.elems + stack.clear() + _taskStack.remove() + dispatchFuture(executor, () => _taskStack.get.elems = tasks, true) + case null => + // do nothing - there is no local batching stack anymore + case _ => + _taskStack.remove() + } + + private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit = + _taskStack.get match { + case stack if (stack ne null) && !force => stack push task + case _ => executor.execute(new Runnable { + def run() { + try { + val taskStack = Stack[() => Unit](task) + _taskStack set taskStack + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e => + // TODO catching all and continue isn't good for OOME + executor.reportFailure(e) + } + } + } finally { + _taskStack.remove() + } + } + }) + } + } |