summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/concurrent/impl/ExecutionContextImpl.scala')
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala87
1 files changed, 6 insertions, 81 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 6201de14d7..5dc440f42b 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,8 +13,8 @@ package scala.concurrent.impl
import java.util.concurrent.{Callable, ExecutorService}
import scala.concurrent.forkjoin._
import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
-import scala.util.{ Duration, Try, Success, Failure }
-import scala.collection.mutable.Stack
+import scala.util.{ Try, Success, Failure }
+import scala.concurrent.util.{ Duration }
@@ -37,32 +37,12 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
def run() = body()
})
- def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this)
-
- def future[T](body: =>T): Future[T] = {
- val p = promise[T]
-
- dispatchFuture {
- () =>
- p complete {
- try {
- Success(body)
- } catch {
- case e => resolver(e)
- }
- }
- }
-
- p.future
- }
-
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+ def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0))
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(awaitable) // inside an execution context thread
- }
+ Future.releaseStack(this)
+
+ awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
}
def reportFailure(t: Throwable) = t match {
@@ -70,61 +50,6 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
case t => t.printStackTrace()
}
- /** Only callable from the tasks running on the same execution context. */
- private def blockingCall[T](body: Awaitable[T]): T = {
- releaseStack()
-
- // TODO see what to do with timeout
- body.await(Duration.fromNanos(0))(CanAwaitEvidence)
- }
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
- private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
-
- private def releaseStack(): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.nonEmpty =>
- val tasks = stack.elems
- stack.clear()
- _taskStack.remove()
- dispatchFuture(() => _taskStack.get.elems = tasks, true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && !force => stack push task
- case _ => this.execute(
- new Runnable {
- def run() {
- try {
- val taskStack = Stack[() => Unit](task)
- _taskStack set taskStack
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try {
- next.apply()
- } catch {
- case e =>
- // TODO catching all and continue isn't good for OOME
- reportFailure(e)
- }
- }
- } finally {
- _taskStack.remove()
- }
- }
- }
- )
- }
-
}