diff options
Diffstat (limited to 'src/library/scala/concurrent/impl/ExecutionContextImpl.scala')
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 87 |
1 files changed, 6 insertions, 81 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 6201de14d7..5dc440f42b 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -13,8 +13,8 @@ package scala.concurrent.impl import java.util.concurrent.{Callable, ExecutorService} import scala.concurrent.forkjoin._ import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} -import scala.util.{ Duration, Try, Success, Failure } -import scala.collection.mutable.Stack +import scala.util.{ Try, Success, Failure } +import scala.concurrent.util.{ Duration } @@ -37,32 +37,12 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E def run() = body() }) - def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this) - - def future[T](body: =>T): Future[T] = { - val p = promise[T] - - dispatchFuture { - () => - p complete { - try { - Success(body) - } catch { - case e => resolver(e) - } - } - } - - p.future - } - - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0)) def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(awaitable) // inside an execution context thread - } + Future.releaseStack(this) + + awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) } def reportFailure(t: Throwable) = t match { @@ -70,61 +50,6 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E case t => t.printStackTrace() } - /** Only callable from the tasks running on the same execution context. */ - private def blockingCall[T](body: Awaitable[T]): T = { - releaseStack() - - // TODO see what to do with timeout - body.await(Duration.fromNanos(0))(CanAwaitEvidence) - } - - // an optimization for batching futures - // TODO we should replace this with a public queue, - // so that it can be stolen from - // OR: a push to the local task queue should be so cheap that this is - // not even needed, but stealing is still possible - private val _taskStack = new ThreadLocal[Stack[() => Unit]]() - - private def releaseStack(): Unit = - _taskStack.get match { - case stack if (stack ne null) && stack.nonEmpty => - val tasks = stack.elems - stack.clear() - _taskStack.remove() - dispatchFuture(() => _taskStack.get.elems = tasks, true) - case null => - // do nothing - there is no local batching stack anymore - case _ => - _taskStack.remove() - } - - private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit = - _taskStack.get match { - case stack if (stack ne null) && !force => stack push task - case _ => this.execute( - new Runnable { - def run() { - try { - val taskStack = Stack[() => Unit](task) - _taskStack set taskStack - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case e => - // TODO catching all and continue isn't good for OOME - reportFailure(e) - } - } - } finally { - _taskStack.remove() - } - } - } - ) - } - } |