summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-16 13:44:05 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-16 13:44:05 +0100
commit8b5f05ac364dd13f6b0443690825adc382ff8fc7 (patch)
treead43547ad5d46659166ca5ba0d9f87c4186e1c20
parent7993ec04baf28cd12009d15979c2c904afad89d3 (diff)
downloadscala-8b5f05ac364dd13f6b0443690825adc382ff8fc7.tar.gz
scala-8b5f05ac364dd13f6b0443690825adc382ff8fc7.tar.bz2
scala-8b5f05ac364dd13f6b0443690825adc382ff8fc7.zip
Add execution context implementation to akka futures.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala8
-rw-r--r--src/library/scala/concurrent/akka/ExecutionContextImpl.scala102
-rw-r--r--src/library/scala/concurrent/akka/Promise.scala85
3 files changed, 163 insertions, 32 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 38a28044e1..078b05c517 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -1,3 +1,11 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
package scala.concurrent
diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
new file mode 100644
index 0000000000..28638d1247
--- /dev/null
+++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
@@ -0,0 +1,102 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.akka
+
+
+
+import java.util.concurrent.{Callable, ExecutorService}
+import scala.concurrent.{ExecutionContext, resolver, Awaitable}
+import scala.util.Duration
+import scala.collection.mutable.Stack
+
+
+
+class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext {
+
+ def execute(runnable: Runnable): Unit = executorService execute runnable
+
+ def execute[U](body: () => U): Unit = execute(new Runnable {
+ def run() = body()
+ })
+
+ def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this)
+
+ def future[T](body: =>T): Future[T] = {
+ val p = promise[T]
+
+ dispatchFuture {
+ () =>
+ p complete {
+ try {
+ Right(body)
+ } catch {
+ case e => resolver(e)
+ }
+ }
+ }
+
+ p.future
+ }
+
+ /** Only callable from the tasks running on the same execution context. */
+ def blockingCall[T](body: Awaitable[T]): T = {
+ releaseStack()
+
+ // TODO see what to do with timeout
+ body.await(Duration.fromNanos(0))(CanAwaitEvidence)
+ }
+
+ // an optimization for batching futures
+ // TODO we should replace this with a public queue,
+ // so that it can be stolen from
+ // OR: a push to the local task queue should be so cheap that this is
+ // not even needed, but stealing is still possible
+ private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private def releaseStack(): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && stack.nonEmpty =>
+ val tasks = stack.elems
+ stack.clear()
+ _taskStack.remove()
+ dispatchFuture(() => _taskStack.get.elems = tasks, true)
+ case null =>
+ // do nothing - there is no local batching stack anymore
+ case _ =>
+ _taskStack.remove()
+ }
+
+ private[akka] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && !force => stack push task
+ case _ => this.execute(
+ new Runnable {
+ def run() {
+ try {
+ val taskStack = Stack[() => Unit](task)
+ _taskStack set taskStack
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e =>
+ // TODO catching all and continue isn't good for OOME
+ e.printStackTrace()
+ }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+ }
+ )
+ }
+
+}
diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala
index d3b93b9573..3b77f14f70 100644
--- a/src/library/scala/concurrent/akka/Promise.scala
+++ b/src/library/scala/concurrent/akka/Promise.scala
@@ -12,7 +12,7 @@ package scala.concurrent.akka
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
-import scala.concurrent.{Awaitable, ExecutionContext, resolver, blocking, CanAwait, TimeoutException}
+import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException}
import scala.util.continuations._
import scala.util.Duration
import scala.annotation.tailrec
@@ -21,6 +21,8 @@ import scala.annotation.tailrec
trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
+ def future = this
+
// TODO refine answer and return types here from Any to type parameters
// then move this up in the hierarchy
@@ -75,7 +77,7 @@ object Promise {
*/
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
- case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] {
+ case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] {
def value: Option[Either[Throwable, T]] = None
}
@@ -89,8 +91,9 @@ object Promise {
private val emptyPendingValue = Pending[Nothing](Nil)
- /* default promise implementation */
- abstract class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
+ /** Default promise implementation.
+ */
+ class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] {
self =>
updater.set(this, Promise.EmptyPending())
@@ -102,7 +105,13 @@ object Promise {
val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec
val start = System.nanoTime()
- try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
+ try {
+ synchronized {
+ while (value.isEmpty) wait(ms, ns)
+ }
+ } catch {
+ case e: InterruptedException =>
+ }
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else
@@ -133,80 +142,92 @@ object Promise {
@inline
protected final def getState: FState[T] = updater.get(this)
- /*
def tryComplete(value: Either[Throwable, T]): Boolean = {
- val callbacks: List[Either[Throwable, T] => Unit] = {
+ val callbacks: List[Either[Throwable, T] => Any] = {
try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
+ def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = {
getState match {
case cur @ Pending(listeners) =>
- if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
- else tryComplete(v)
+ if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
+ else tryComplete(v)
case _ => null
}
}
tryComplete(resolve(value))
} finally {
- synchronized { notifyAll() } //Notify any evil blockers
+ synchronized { notifyAll() } // notify any blockers from `tryAwait`
}
}
callbacks match {
case null => false
case cs if cs.isEmpty => true
- case cs => Future.dispatchTask(() => cs.foreach(f => notifyCompleted(f, value))); true
+ case cs =>
+ executor dispatchFuture {
+ () => cs.foreach(f => notifyCompleted(f, value))
+ }
+ true
}
}
- def onComplete(func: Either[Throwable, T] => Unit): this.type = {
- @tailrec //Returns whether the future has already been completed or not
+ def onComplete[U](func: Either[Throwable, T] => U): this.type = {
+ @tailrec // Returns whether the future has already been completed or not
def tryAddCallback(): Boolean = {
val cur = getState
cur match {
case _: Success[_] | _: Failure[_] => true
case p: Pending[_] =>
- val pt = p.asInstanceOf[Pending[T]]
- if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
+ val pt = p.asInstanceOf[Pending[T]]
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
}
}
if (tryAddCallback()) {
val result = value.get
- Future.dispatchTask(() => notifyCompleted(func, result))
+ executor dispatchFuture {
+ () => notifyCompleted(func, result)
+ }
}
this
}
- private final def notifyCompleted(func: Either[Throwable, T] => Unit, result: Either[Throwable, T]) {
- try { func(result) } catch { case e => logError("Future onComplete-callback raised an exception", e) }
+ private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
+ // TODO see what to do about logging
+ //try {
+ func(result)
+ //} catch {
+ // case e => logError("Future onComplete-callback raised an exception", e)
+ //}
}
- */
}
- /*
- /**
- * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
- * a Future-composition but you already have a value to contribute.
+ /** An already completed Future is given its result at creation.
+ *
+ * Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContextImpl) extends Promise[T] {
val value = Some(resolve(suppliedValue))
-
+
def tryComplete(value: Either[Throwable, T]): Boolean = false
- def onComplete(func: Either[Throwable, T] => Unit): this.type = {
+
+ def onComplete[U](func: Either[Throwable, T] => U): this.type = {
val completedAs = value.get
- Future dispatchTask (() => func(completedAs))
+ executor dispatchFuture {
+ () => func(completedAs)
+ }
this
}
-
- def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
- def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
+
+ private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+
+ def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
case Left(e) => throw e
case Right(r) => r
}
}
- */
+
}