summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-13 17:07:49 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-13 17:07:49 +0100
commitf8c3f31f2fbf1544723e4cc3fe4af602dab62372 (patch)
tree83bdcd4f6130bac6b738960c607cc629218813f6
parent5d2acb2b3d6b2880ba36f039bbf98c583ce85a21 (diff)
downloadscala-f8c3f31f2fbf1544723e4cc3fe4af602dab62372.tar.gz
scala-f8c3f31f2fbf1544723e4cc3fe4af602dab62372.tar.bz2
scala-f8c3f31f2fbf1544723e4cc3fe4af602dab62372.zip
Work in progress on porting akka promises and futures.
-rw-r--r--src/library/scala/concurrent/Future.scala45
-rw-r--r--src/library/scala/concurrent/Promise.scala33
-rw-r--r--src/library/scala/concurrent/akka/Future.scala181
-rw-r--r--src/library/scala/concurrent/akka/Promise.scala63
-rw-r--r--src/library/scala/concurrent/akka/package.scala36
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala45
-rw-r--r--src/library/scala/concurrent/package.scala12
7 files changed, 231 insertions, 184 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index d074dbfaaa..4002239fc4 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -179,8 +179,10 @@ self =>
val p = newPromise[U]
onComplete {
- case Left(t) => if (pf isDefinedAt t) p success pf(t) else p failure t
- case Right(v) => p success v
+ case Left(t) if pf isDefinedAt t =>
+ try { p success pf(t) }
+ catch { case t: Throwable => p complete resolver(t) }
+ case otherwise => p complete otherwise
}
p.future
@@ -206,7 +208,11 @@ self =>
onComplete {
case Left(t) => p failure t
- case Right(v) => p success f(v)
+ case Right(v) =>
+ try p success f(v)
+ catch {
+ case t => p complete resolver(t)
+ }
}
p.future
@@ -224,10 +230,15 @@ self =>
onComplete {
case Left(t) => p failure t
- case Right(v) => f(v) onComplete {
- case Left(t) => p failure t
- case Right(v) => p success v
- }
+ case Right(v) =>
+ try {
+ f(v) onComplete {
+ case Left(t) => p failure t
+ case Right(v) => p success v
+ }
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
}
p.future
@@ -254,7 +265,13 @@ self =>
onComplete {
case Left(t) => p failure t
- case Right(v) => if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
+ case Right(v) =>
+ try {
+ if (pred(v)) p success v
+ else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
}
p.future
@@ -285,7 +302,13 @@ self =>
onComplete {
case Left(t) => p failure t
- case Right(v) => if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
+ case Right(v) =>
+ try {
+ if (pf.isDefinedAt(v)) p success pf(v)
+ else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
}
p.future
@@ -295,6 +318,8 @@ self =>
* the result of the `that` future if `that` is completed successfully.
* If both futures are failed, the resulting future holds the throwable object of the first future.
*
+ * Using this method will not cause concurrent programs to become nondeterministic.
+ *
* Example:
* {{{
* val f = future { sys.error("failed") }
@@ -326,7 +351,7 @@ self =>
* {{{
* val f = future { sys.error("failed") }
* val g = future { 5 }
- * val h = f orElse g
+ * val h = f or g
* await(0) h // evaluates to either 5 or throws a runtime exception
* }}}
*/
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index f6ea252f73..e5557ae1c3 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -35,13 +35,42 @@ trait Promise[T] {
*/
def future: Future[T]
+ private def throwCompleted = throw new IllegalStateException("Promise already completed.")
+
+ /** Completes the promise with either an exception or a value.
+ *
+ * @param result Either the value or the exception to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def complete(result: Either[Throwable, T]): this.type = if (tryComplete(result)) this else throwCompleted
+
+ /** Tries to complete the promise with either a value or the exception.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def tryComplete(result: Either[Throwable, T]): Boolean
+
+ /** Completes this promise with the specified future, once that future is completed.
+ *
+ * @return This promise
+ */
+ final def completeWith(other: Future[T]): this.type = {
+ other onComplete {
+ this complete _
+ }
+ this
+ }
+
/** Completes the promise with a value.
*
* @param value The value to complete the promise with.
*
* $promiseCompletion
*/
- def success(v: T): this.type = if (trySuccess(v)) this else throw new IllegalStateException("Promise already completed.")
+ def success(v: T): this.type = if (trySuccess(v)) this else throwCompleted
/** Tries to complete the promise with a value.
*
@@ -59,7 +88,7 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def failure(t: Throwable): this.type = if (tryFailure(t)) this else throw new IllegalStateException("Promise already completed.")
+ def failure(t: Throwable): this.type = if (tryFailure(t)) this else throwCompleted
/** Tries to complete the promise with an exception.
*
diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala
index 2b41c0c62e..c48009554c 100644
--- a/src/library/scala/concurrent/akka/Future.scala
+++ b/src/library/scala/concurrent/akka/Future.scala
@@ -1,4 +1,4 @@
-/*/* __ *\
+/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
@@ -10,178 +10,67 @@ package scala.concurrent.akka
+import scala.concurrent.{Awaitable, ExecutionContext}
+import scala.util.continuations._
-sealed trait Future[+T] extends scala.concurrent.Future with Awaitable[T] {
+trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
implicit def executor: ExecutionContext
- /**
- * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
- *
- * Returns the result of this Future without blocking, by suspending execution and storing it as a
- * continuation until the result is available.
+ /** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
+ *
+ * Returns the result of this Future without blocking, by suspending execution and storing it as a
+ * continuation until the result is available.
*/
- def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any]))
+ def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any]))
- /**
- * Tests whether this Future has been completed.
+ /** Tests whether this Future has been completed.
*/
final def isCompleted: Boolean = value.isDefined
- /**
- * The contained value of this Future. Before this Future is completed
- * the value will be None. After completion the value will be Some(Right(t))
- * if it contains a valid result, or Some(Left(error)) if it contains
- * an exception.
+ /** The contained value of this Future. Before this Future is completed
+ * the value will be None. After completion the value will be Some(Right(t))
+ * if it contains a valid result, or Some(Left(error)) if it contains
+ * an exception.
*/
def value: Option[Either[Throwable, T]]
- def onComplete(func: Either[Throwable, T] => Unit): this.type
+ def onComplete[U](func: Either[Throwable, T] => U): this.type
- /**
- * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this.
- * This is semantically the same as: Future.firstCompletedOf(Seq(this, that))
+ /** Creates a new Future[A] which is completed with this Future's result if
+ * that conforms to A's erased type or a ClassCastException otherwise.
*/
- //FIXME implement as the result of any of the Futures, or if both failed, the first failure
- def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize
-
- final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
- val future = Promise[A]()
+ final def mapTo[T](implicit m: Manifest[T]) = {
+ val p = executor.promise[T]
+
onComplete {
- case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
- case otherwise ⇒ future complete otherwise
- }
- future
- }
-
- /**
- * Creates a new Future by applying a function to the successful result of
- * this Future. If this Future is completed with an exception then the new
- * Future will also contain this exception.
- * Example:
- * <pre>
- * val future1 = for {
- * a: Int <- actor ? "Hello" // returns 5
- * b: String <- actor ? a // returns "10"
- * c: String <- actor ? 7 // returns "14"
- * } yield b + "-" + c
- * </pre>
- */
- final def map[A](f: T ⇒ A): Future[A] = {
- val future = Promise[A]()
- onComplete {
- case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
- case Right(res) ⇒
- future complete (try {
- Right(f(res))
- } catch {
- case e ⇒
- logError("Future.map", e)
- Left(e)
- })
- }
- future
- }
-
- /**
- * Creates a new Future[A] which is completed with this Future's result if
- * that conforms to A's erased type or a ClassCastException otherwise.
- */
- final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
- val fa = Promise[A]()
- onComplete {
- case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]]
- case Right(t) ⇒
- fa complete (try {
- Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
+ case l @ Left(t) => p complete l.asInstanceOf[Either[Throwable, T]]
+ case Right(v) =>
+ p complete (try {
+ Right(boxedType(m.erasure).cast(v).asInstanceOf[T])
} catch {
case e: ClassCastException ⇒ Left(e)
})
}
- fa
+
+ p.future
}
- /**
- * Creates a new Future by applying a function to the successful result of
- * this Future, and returns the result of the function as the new Future.
- * If this Future is completed with an exception then the new Future will
- * also contain this exception.
- * Example:
- * <pre>
- * val future1 = for {
- * a: Int <- actor ? "Hello" // returns 5
- * b: String <- actor ? a // returns "10"
- * c: String <- actor ? 7 // returns "14"
- * } yield b + "-" + c
- * </pre>
+ /** Used by for-comprehensions.
*/
- final def flatMap[A](f: T ⇒ Future[A]): Future[A] = {
- val p = Promise[A]()
+ final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
- onComplete {
- case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]]
- case Right(r) ⇒
- try {
- p completeWith f(r)
- } catch {
- case e ⇒
- p complete Left(e)
- logError("Future.flatMap", e)
- }
- }
- p
- }
-
- /**
- * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions
- */
- final def foreach(f: T ⇒ Unit): Unit = onComplete {
- case Right(r) ⇒ f(r)
- case _ ⇒
- }
-
- /**
- * Used by for-comprehensions
- */
- final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p)
-
- final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) {
- def foreach(f: A ⇒ Unit): Unit = self filter p foreach f
- def map[B](f: A ⇒ B): Future[B] = self filter p map f
- def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f
- def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
- }
-
- /**
- * Returns a new Future that will hold the successful result of this Future if it matches
- * the given predicate, if it doesn't match, the resulting Future will be a failed Future
- * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future
- */
- final def filter(pred: T ⇒ Boolean): Future[T] = {
- val p = Promise[T]()
- onComplete {
- case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]]
- case r @ Right(res) ⇒ p complete (try {
- if (pred(res)) r else Left(new MatchError(res))
- } catch {
- case e ⇒
- logError("Future.filter", e)
- Left(e)
- })
- }
- p
- }
-
- protected def logError(msg: String, problem: Throwable): Unit = {
- executor match {
- case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage))
- case other ⇒ problem.printStackTrace()
- }
+ final class FutureWithFilter[+A](self: Future[A], p: A => Boolean) {
+ def foreach(f: A => Unit): Unit = self filter p foreach f
+ def map[B](f: A => B) = self filter p map f
+ def flatMap[B](f: A => Future[B]) = self filter p flatMap f
+ def withFilter(q: A => Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
}
+
}
-*/
+
diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala
new file mode 100644
index 0000000000..a47dee48e2
--- /dev/null
+++ b/src/library/scala/concurrent/akka/Promise.scala
@@ -0,0 +1,63 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.akka
+
+
+
+import scala.concurrent.{ExecutionContext, resolver}
+import scala.util.continuations._
+
+
+
+trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
+
+ // TODO refine answer and return types here from Any to type parameters
+
+ final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
+ cont: (Future[T] => Future[Any]) =>
+ cont(complete(Right(value)))
+ }
+
+ final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift {
+ cont: (Future[T] => Future[Any]) =>
+ val p = executor.promise[Any]
+ val thisPromise = this
+
+ thisPromise completeWith other
+ thisPromise onComplete { v =>
+ try {
+ p completeWith cont(thisPromise)
+ } catch {
+ case e => p complete resolver(e)
+ }
+ }
+
+ p.future
+ }
+
+ // TODO finish this once we introduce something like dataflow streams
+
+ /*
+ final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) =>
+ val fr = executor.promise[Any]
+ val f = stream.dequeue(this)
+ f.onComplete { _ =>
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e =>
+ fr failure e
+ }
+ }
+ fr
+ }
+ */
+
+}
+
diff --git a/src/library/scala/concurrent/akka/package.scala b/src/library/scala/concurrent/akka/package.scala
new file mode 100644
index 0000000000..59eda5a3b4
--- /dev/null
+++ b/src/library/scala/concurrent/akka/package.scala
@@ -0,0 +1,36 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import java.{lang => jl}
+
+
+
+package object akka {
+
+ private val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[jl.Boolean],
+ classOf[Byte] -> classOf[jl.Byte],
+ classOf[Char] -> classOf[jl.Character],
+ classOf[Short] -> classOf[jl.Short],
+ classOf[Int] -> classOf[jl.Integer],
+ classOf[Long] -> classOf[jl.Long],
+ classOf[Float] -> classOf[jl.Float],
+ classOf[Double] -> classOf[jl.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
+
+ def boxedType(c: Class[_]): Class[_] = {
+ if (c.isPrimitive) toBoxed(c) else c
+ }
+
+}
+
+
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index 52d2ea8cfb..716b9c02f1 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -11,9 +11,9 @@ import scala.annotation.tailrec
private[concurrent] trait Completable[T] {
- self: Future[T] =>
+self: Future[T] =>
- val executionContext: ExecutionContextImpl
+ val executor: ExecutionContextImpl
type Callback = Either[Throwable, T] => Any
@@ -62,9 +62,9 @@ private[concurrent] trait Completable[T] {
}
private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
- extends Promise[T] with Future[T] with Completable[T] {
+extends Promise[T] with Future[T] with Completable[T] {
- val executionContext: scala.concurrent.default.ExecutionContextImpl = context
+ val executor: scala.concurrent.default.ExecutionContextImpl = context
@volatile private var state: State[T] = _
@@ -85,40 +85,35 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
case _ => null
}
- /** Completes the promise with a value.
- *
- * @param value The value to complete the promise with.
- *
- * $promiseCompletion
- */
- def success(value: T): Unit = {
+ def tryComplete(r: Either[Throwable, T]) = r match {
+ case Left(t) => tryFailure(t)
+ case Right(v) => trySuccess(v)
+ }
+
+ def trySuccess(value: T): Boolean = {
val cbs = tryCompleteState(Success(value))
if (cbs == null)
- throw new IllegalStateException
+ false
else {
processCallbacks(cbs, Right(value))
this.synchronized {
this.notifyAll()
}
+ true
}
}
- /** Completes the promise with an exception.
- *
- * @param t The throwable to complete the promise with.
- *
- * $promiseCompletion
- */
- def failure(t: Throwable): Unit = {
+ def tryFailure(t: Throwable): Boolean = {
val wrapped = wrap(t)
val cbs = tryCompleteState(Failure(wrapped))
if (cbs == null)
- throw new IllegalStateException
+ false
else {
processCallbacks(cbs, Left(wrapped))
this.synchronized {
this.notifyAll()
}
+ true
}
}
@@ -140,9 +135,9 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
}
private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
- extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
+extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
- val executionContext: ExecutionContextImpl = context
+ val executor: ExecutionContextImpl = context
@volatile private var state: State[T] = _
@@ -179,8 +174,8 @@ private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
def start(): Unit = {
Thread.currentThread match {
- case fj: ForkJoinWorkerThread if fj.getPool eq executionContext.pool => fork()
- case _ => executionContext.pool.execute(this)
+ case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork()
+ case _ => executor.pool.execute(this)
}
}
@@ -264,7 +259,7 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
// TODO fix the timeout
def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match {
- case fj: TaskImpl[_] if fj.executionContext.pool eq pool =>
+ case fj: TaskImpl[_] if fj.executor.pool eq pool =>
fj.await(timeout)
case _ =>
var res: T = null.asInstanceOf[T]
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index c35ece5668..ce22c53c72 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -50,12 +50,22 @@ package object concurrent {
case _ => true
}
- private[concurrent] def resolveThrowable[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
+ private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T])
case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t))
+ case Left(e: Error) => throw e
case _ => source
}
+ private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = {
+ case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value)
+ case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
+ case e: Error => throw e
+ case t => Left(t)
+ }
+
+ private[concurrent] def resolver[T] = resolverFunction.asInstanceOf[PartialFunction[Throwable, Either[Throwable, T]]]
+
/* concurrency constructs */
def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =