summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl/Future.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/concurrent/impl/Future.scala')
-rw-r--r--src/library/scala/concurrent/impl/Future.scala74
1 files changed, 70 insertions, 4 deletions
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index b4385ea34a..6833b2467f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -8,13 +8,17 @@
package scala.concurrent.impl
+
+
import scala.concurrent.{Awaitable, ExecutionContext}
import scala.util.{ Try, Success, Failure }
-//import scala.util.continuations._
+import scala.collection.mutable.Stack
+
+
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContextImpl
+ implicit def executor: ExecutionContext
/** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
@@ -40,7 +44,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[T](implicit m: Manifest[T]) = {
- val p = executor.promise[T]
+ val p = new Promise.DefaultPromise[T]
onComplete {
case f @ Failure(t) => p complete f.asInstanceOf[Try[T]]
@@ -48,7 +52,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
p complete (try {
Success(Future.boxedType(m.erasure).cast(v).asInstanceOf[T])
} catch {
- case e: ClassCastException ⇒ Failure(e)
+ case e: ClassCastException => Failure(e)
})
}
@@ -86,4 +90,66 @@ object Future {
def boxedType(c: Class[_]): Class[_] = {
if (c.isPrimitive) toBoxed(c) else c
}
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ val promise = new Promise.DefaultPromise[T]()
+ executor.execute(new Runnable {
+ def run = {
+ promise complete {
+ try {
+ Success(body)
+ } catch {
+ case e => scala.concurrent.resolver(e)
+ }
+ }
+ }
+ })
+ promise.future
+ }
+
+ // an optimization for batching futures
+ // TODO we should replace this with a public queue,
+ // so that it can be stolen from
+ // OR: a push to the local task queue should be so cheap that this is
+ // not even needed, but stealing is still possible
+ private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private[impl] def releaseStack(executor: ExecutionContext): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && stack.nonEmpty =>
+ val tasks = stack.elems
+ stack.clear()
+ _taskStack.remove()
+ dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
+ case null =>
+ // do nothing - there is no local batching stack anymore
+ case _ =>
+ _taskStack.remove()
+ }
+
+ private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && !force => stack push task
+ case _ => executor.execute(new Runnable {
+ def run() {
+ try {
+ val taskStack = Stack[() => Unit](task)
+ _taskStack set taskStack
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e =>
+ // TODO catching all and continue isn't good for OOME
+ executor.reportFailure(e)
+ }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+ })
+ }
+
}