diff options
author | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-16 13:44:05 +0100 |
---|---|---|
committer | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-16 13:44:05 +0100 |
commit | 8b5f05ac364dd13f6b0443690825adc382ff8fc7 (patch) | |
tree | ad43547ad5d46659166ca5ba0d9f87c4186e1c20 | |
parent | 7993ec04baf28cd12009d15979c2c904afad89d3 (diff) | |
download | scala-8b5f05ac364dd13f6b0443690825adc382ff8fc7.tar.gz scala-8b5f05ac364dd13f6b0443690825adc382ff8fc7.tar.bz2 scala-8b5f05ac364dd13f6b0443690825adc382ff8fc7.zip |
Add execution context implementation to akka futures.
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 8 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/ExecutionContextImpl.scala | 102 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/Promise.scala | 85 |
3 files changed, 163 insertions, 32 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 38a28044e1..078b05c517 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -1,3 +1,11 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + package scala.concurrent diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala new file mode 100644 index 0000000000..28638d1247 --- /dev/null +++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala @@ -0,0 +1,102 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.akka + + + +import java.util.concurrent.{Callable, ExecutorService} +import scala.concurrent.{ExecutionContext, resolver, Awaitable} +import scala.util.Duration +import scala.collection.mutable.Stack + + + +class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { + + def execute(runnable: Runnable): Unit = executorService execute runnable + + def execute[U](body: () => U): Unit = execute(new Runnable { + def run() = body() + }) + + def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this) + + def future[T](body: =>T): Future[T] = { + val p = promise[T] + + dispatchFuture { + () => + p complete { + try { + Right(body) + } catch { + case e => resolver(e) + } + } + } + + p.future + } + + /** Only callable from the tasks running on the same execution context. */ + def blockingCall[T](body: Awaitable[T]): T = { + releaseStack() + + // TODO see what to do with timeout + body.await(Duration.fromNanos(0))(CanAwaitEvidence) + } + + // an optimization for batching futures + // TODO we should replace this with a public queue, + // so that it can be stolen from + // OR: a push to the local task queue should be so cheap that this is + // not even needed, but stealing is still possible + private val _taskStack = new ThreadLocal[Stack[() => Unit]]() + + private def releaseStack(): Unit = + _taskStack.get match { + case stack if (stack ne null) && stack.nonEmpty => + val tasks = stack.elems + stack.clear() + _taskStack.remove() + dispatchFuture(() => _taskStack.get.elems = tasks, true) + case null => + // do nothing - there is no local batching stack anymore + case _ => + _taskStack.remove() + } + + private[akka] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit = + _taskStack.get match { + case stack if (stack ne null) && !force => stack push task + case _ => this.execute( + new Runnable { + def run() { + try { + val taskStack = Stack[() => Unit](task) + _taskStack set taskStack + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e => + // TODO catching all and continue isn't good for OOME + e.printStackTrace() + } + } + } finally { + _taskStack.remove() + } + } + } + ) + } + +} diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index d3b93b9573..3b77f14f70 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -12,7 +12,7 @@ package scala.concurrent.akka import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.util.concurrent.atomic.AtomicReferenceFieldUpdater -import scala.concurrent.{Awaitable, ExecutionContext, resolver, blocking, CanAwait, TimeoutException} +import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException} import scala.util.continuations._ import scala.util.Duration import scala.annotation.tailrec @@ -21,6 +21,8 @@ import scala.annotation.tailrec trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { + def future = this + // TODO refine answer and return types here from Any to type parameters // then move this up in the hierarchy @@ -75,7 +77,7 @@ object Promise { */ sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } - case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] { + case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] { def value: Option[Either[Throwable, T]] = None } @@ -89,8 +91,9 @@ object Promise { private val emptyPendingValue = Pending[Nothing](Nil) - /* default promise implementation */ - abstract class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { + /** Default promise implementation. + */ + class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] { self => updater.set(this, Promise.EmptyPending()) @@ -102,7 +105,13 @@ object Promise { val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec val start = System.nanoTime() - try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + try { + synchronized { + while (value.isEmpty) wait(ms, ns) + } + } catch { + case e: InterruptedException => + } awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) } else @@ -133,80 +142,92 @@ object Promise { @inline protected final def getState: FState[T] = updater.get(this) - /* def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] => Unit] = { + val callbacks: List[Either[Throwable, T] => Any] = { try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = { getState match { case cur @ Pending(listeners) => - if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners - else tryComplete(v) + if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners + else tryComplete(v) case _ => null } } tryComplete(resolve(value)) } finally { - synchronized { notifyAll() } //Notify any evil blockers + synchronized { notifyAll() } // notify any blockers from `tryAwait` } } callbacks match { case null => false case cs if cs.isEmpty => true - case cs => Future.dispatchTask(() => cs.foreach(f => notifyCompleted(f, value))); true + case cs => + executor dispatchFuture { + () => cs.foreach(f => notifyCompleted(f, value)) + } + true } } - def onComplete(func: Either[Throwable, T] => Unit): this.type = { - @tailrec //Returns whether the future has already been completed or not + def onComplete[U](func: Either[Throwable, T] => U): this.type = { + @tailrec // Returns whether the future has already been completed or not def tryAddCallback(): Boolean = { val cur = getState cur match { case _: Success[_] | _: Failure[_] => true case p: Pending[_] => - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() } } if (tryAddCallback()) { val result = value.get - Future.dispatchTask(() => notifyCompleted(func, result)) + executor dispatchFuture { + () => notifyCompleted(func, result) + } } this } - private final def notifyCompleted(func: Either[Throwable, T] => Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case e => logError("Future onComplete-callback raised an exception", e) } + private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { + // TODO see what to do about logging + //try { + func(result) + //} catch { + // case e => logError("Future onComplete-callback raised an exception", e) + //} } - */ } - /* - /** - * An already completed Future is seeded with it's result at creation, is useful for when you are participating in - * a Future-composition but you already have a value to contribute. + /** An already completed Future is given its result at creation. + * + * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { val value = Some(resolve(suppliedValue)) - + def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete(func: Either[Throwable, T] => Unit): this.type = { + + def onComplete[U](func: Either[Throwable, T] => U): this.type = { val completedAs = value.get - Future dispatchTask (() => func(completedAs)) + executor dispatchFuture { + () => func(completedAs) + } this } - - def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + + def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match { case Left(e) => throw e case Right(r) => r } } - */ + } |