diff options
author | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-13 17:07:49 +0100 |
---|---|---|
committer | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-13 17:07:49 +0100 |
commit | f8c3f31f2fbf1544723e4cc3fe4af602dab62372 (patch) | |
tree | 83bdcd4f6130bac6b738960c607cc629218813f6 | |
parent | 5d2acb2b3d6b2880ba36f039bbf98c583ce85a21 (diff) | |
download | scala-f8c3f31f2fbf1544723e4cc3fe4af602dab62372.tar.gz scala-f8c3f31f2fbf1544723e4cc3fe4af602dab62372.tar.bz2 scala-f8c3f31f2fbf1544723e4cc3fe4af602dab62372.zip |
Work in progress on porting akka promises and futures.
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 45 | ||||
-rw-r--r-- | src/library/scala/concurrent/Promise.scala | 33 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/Future.scala | 181 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/Promise.scala | 63 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/package.scala | 36 | ||||
-rw-r--r-- | src/library/scala/concurrent/default/TaskImpl.scala | 45 | ||||
-rw-r--r-- | src/library/scala/concurrent/package.scala | 12 |
7 files changed, 231 insertions, 184 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index d074dbfaaa..4002239fc4 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -179,8 +179,10 @@ self => val p = newPromise[U] onComplete { - case Left(t) => if (pf isDefinedAt t) p success pf(t) else p failure t - case Right(v) => p success v + case Left(t) if pf isDefinedAt t => + try { p success pf(t) } + catch { case t: Throwable => p complete resolver(t) } + case otherwise => p complete otherwise } p.future @@ -206,7 +208,11 @@ self => onComplete { case Left(t) => p failure t - case Right(v) => p success f(v) + case Right(v) => + try p success f(v) + catch { + case t => p complete resolver(t) + } } p.future @@ -224,10 +230,15 @@ self => onComplete { case Left(t) => p failure t - case Right(v) => f(v) onComplete { - case Left(t) => p failure t - case Right(v) => p success v - } + case Right(v) => + try { + f(v) onComplete { + case Left(t) => p failure t + case Right(v) => p success v + } + } catch { + case t: Throwable => p complete resolver(t) + } } p.future @@ -254,7 +265,13 @@ self => onComplete { case Left(t) => p failure t - case Right(v) => if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) + case Right(v) => + try { + if (pred(v)) p success v + else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) + } catch { + case t: Throwable => p complete resolver(t) + } } p.future @@ -285,7 +302,13 @@ self => onComplete { case Left(t) => p failure t - case Right(v) => if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + case Right(v) => + try { + if (pf.isDefinedAt(v)) p success pf(v) + else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + } catch { + case t: Throwable => p complete resolver(t) + } } p.future @@ -295,6 +318,8 @@ self => * the result of the `that` future if `that` is completed successfully. * If both futures are failed, the resulting future holds the throwable object of the first future. * + * Using this method will not cause concurrent programs to become nondeterministic. + * * Example: * {{{ * val f = future { sys.error("failed") } @@ -326,7 +351,7 @@ self => * {{{ * val f = future { sys.error("failed") } * val g = future { 5 } - * val h = f orElse g + * val h = f or g * await(0) h // evaluates to either 5 or throws a runtime exception * }}} */ diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index f6ea252f73..e5557ae1c3 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -35,13 +35,42 @@ trait Promise[T] { */ def future: Future[T] + private def throwCompleted = throw new IllegalStateException("Promise already completed.") + + /** Completes the promise with either an exception or a value. + * + * @param result Either the value or the exception to complete the promise with. + * + * $promiseCompletion + */ + def complete(result: Either[Throwable, T]): this.type = if (tryComplete(result)) this else throwCompleted + + /** Tries to complete the promise with either a value or the exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryComplete(result: Either[Throwable, T]): Boolean + + /** Completes this promise with the specified future, once that future is completed. + * + * @return This promise + */ + final def completeWith(other: Future[T]): this.type = { + other onComplete { + this complete _ + } + this + } + /** Completes the promise with a value. * * @param value The value to complete the promise with. * * $promiseCompletion */ - def success(v: T): this.type = if (trySuccess(v)) this else throw new IllegalStateException("Promise already completed.") + def success(v: T): this.type = if (trySuccess(v)) this else throwCompleted /** Tries to complete the promise with a value. * @@ -59,7 +88,7 @@ trait Promise[T] { * * $promiseCompletion */ - def failure(t: Throwable): this.type = if (tryFailure(t)) this else throw new IllegalStateException("Promise already completed.") + def failure(t: Throwable): this.type = if (tryFailure(t)) this else throwCompleted /** Tries to complete the promise with an exception. * diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala index 2b41c0c62e..c48009554c 100644 --- a/src/library/scala/concurrent/akka/Future.scala +++ b/src/library/scala/concurrent/akka/Future.scala @@ -1,4 +1,4 @@ -/*/* __ *\ +/* __ *\ ** ________ ___ / / ___ Scala API ** ** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** ** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** @@ -10,178 +10,67 @@ package scala.concurrent.akka +import scala.concurrent.{Awaitable, ExecutionContext} +import scala.util.continuations._ -sealed trait Future[+T] extends scala.concurrent.Future with Awaitable[T] { +trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { implicit def executor: ExecutionContext - /** - * For use only within a Future.flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Future without blocking, by suspending execution and storing it as a - * continuation until the result is available. + /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. */ - def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) + def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) - /** - * Tests whether this Future has been completed. + /** Tests whether this Future has been completed. */ final def isCompleted: Boolean = value.isDefined - /** - * The contained value of this Future. Before this Future is completed - * the value will be None. After completion the value will be Some(Right(t)) - * if it contains a valid result, or Some(Left(error)) if it contains - * an exception. + /** The contained value of this Future. Before this Future is completed + * the value will be None. After completion the value will be Some(Right(t)) + * if it contains a valid result, or Some(Left(error)) if it contains + * an exception. */ def value: Option[Either[Throwable, T]] - def onComplete(func: Either[Throwable, T] => Unit): this.type + def onComplete[U](func: Either[Throwable, T] => U): this.type - /** - * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this. - * This is semantically the same as: Future.firstCompletedOf(Seq(this, that)) + /** Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. */ - //FIXME implement as the result of any of the Futures, or if both failed, the first failure - def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize - - final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val future = Promise[A]() + final def mapTo[T](implicit m: Manifest[T]) = { + val p = executor.promise[T] + onComplete { - case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) - case otherwise ⇒ future complete otherwise - } - future - } - - /** - * Creates a new Future by applying a function to the successful result of - * this Future. If this Future is completed with an exception then the new - * Future will also contain this exception. - * Example: - * <pre> - * val future1 = for { - * a: Int <- actor ? "Hello" // returns 5 - * b: String <- actor ? a // returns "10" - * c: String <- actor ? 7 // returns "14" - * } yield b + "-" + c - * </pre> - */ - final def map[A](f: T ⇒ A): Future[A] = { - val future = Promise[A]() - onComplete { - case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] - case Right(res) ⇒ - future complete (try { - Right(f(res)) - } catch { - case e ⇒ - logError("Future.map", e) - Left(e) - }) - } - future - } - - /** - * Creates a new Future[A] which is completed with this Future's result if - * that conforms to A's erased type or a ClassCastException otherwise. - */ - final def mapTo[A](implicit m: Manifest[A]): Future[A] = { - val fa = Promise[A]() - onComplete { - case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] - case Right(t) ⇒ - fa complete (try { - Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + case l @ Left(t) => p complete l.asInstanceOf[Either[Throwable, T]] + case Right(v) => + p complete (try { + Right(boxedType(m.erasure).cast(v).asInstanceOf[T]) } catch { case e: ClassCastException ⇒ Left(e) }) } - fa + + p.future } - /** - * Creates a new Future by applying a function to the successful result of - * this Future, and returns the result of the function as the new Future. - * If this Future is completed with an exception then the new Future will - * also contain this exception. - * Example: - * <pre> - * val future1 = for { - * a: Int <- actor ? "Hello" // returns 5 - * b: String <- actor ? a // returns "10" - * c: String <- actor ? 7 // returns "14" - * } yield b + "-" + c - * </pre> + /** Used by for-comprehensions. */ - final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val p = Promise[A]() + final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) - onComplete { - case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]] - case Right(r) ⇒ - try { - p completeWith f(r) - } catch { - case e ⇒ - p complete Left(e) - logError("Future.flatMap", e) - } - } - p - } - - /** - * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions - */ - final def foreach(f: T ⇒ Unit): Unit = onComplete { - case Right(r) ⇒ f(r) - case _ ⇒ - } - - /** - * Used by for-comprehensions - */ - final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) - - final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { - def foreach(f: A ⇒ Unit): Unit = self filter p foreach f - def map[B](f: A ⇒ B): Future[B] = self filter p map f - def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f - def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) - } - - /** - * Returns a new Future that will hold the successful result of this Future if it matches - * the given predicate, if it doesn't match, the resulting Future will be a failed Future - * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future - */ - final def filter(pred: T ⇒ Boolean): Future[T] = { - val p = Promise[T]() - onComplete { - case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]] - case r @ Right(res) ⇒ p complete (try { - if (pred(res)) r else Left(new MatchError(res)) - } catch { - case e ⇒ - logError("Future.filter", e) - Left(e) - }) - } - p - } - - protected def logError(msg: String, problem: Throwable): Unit = { - executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) - case other ⇒ problem.printStackTrace() - } + final class FutureWithFilter[+A](self: Future[A], p: A => Boolean) { + def foreach(f: A => Unit): Unit = self filter p foreach f + def map[B](f: A => B) = self filter p map f + def flatMap[B](f: A => Future[B]) = self filter p flatMap f + def withFilter(q: A => Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } + } -*/ + diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala new file mode 100644 index 0000000000..a47dee48e2 --- /dev/null +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -0,0 +1,63 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.akka + + + +import scala.concurrent.{ExecutionContext, resolver} +import scala.util.continuations._ + + + +trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { + + // TODO refine answer and return types here from Any to type parameters + + final def <<(value: T): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] => Future[Any]) => + cont(complete(Right(value))) + } + + final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] => Future[Any]) => + val p = executor.promise[Any] + val thisPromise = this + + thisPromise completeWith other + thisPromise onComplete { v => + try { + p completeWith cont(thisPromise) + } catch { + case e => p complete resolver(e) + } + } + + p.future + } + + // TODO finish this once we introduce something like dataflow streams + + /* + final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => + val fr = executor.promise[Any] + val f = stream.dequeue(this) + f.onComplete { _ => + try { + fr completeWith cont(f) + } catch { + case e => + fr failure e + } + } + fr + } + */ + +} + diff --git a/src/library/scala/concurrent/akka/package.scala b/src/library/scala/concurrent/akka/package.scala new file mode 100644 index 0000000000..59eda5a3b4 --- /dev/null +++ b/src/library/scala/concurrent/akka/package.scala @@ -0,0 +1,36 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import java.{lang => jl} + + + +package object akka { + + private val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) + + def boxedType(c: Class[_]): Class[_] = { + if (c.isPrimitive) toBoxed(c) else c + } + +} + + diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index 52d2ea8cfb..716b9c02f1 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -11,9 +11,9 @@ import scala.annotation.tailrec private[concurrent] trait Completable[T] { - self: Future[T] => +self: Future[T] => - val executionContext: ExecutionContextImpl + val executor: ExecutionContextImpl type Callback = Either[Throwable, T] => Any @@ -62,9 +62,9 @@ private[concurrent] trait Completable[T] { } private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) - extends Promise[T] with Future[T] with Completable[T] { +extends Promise[T] with Future[T] with Completable[T] { - val executionContext: scala.concurrent.default.ExecutionContextImpl = context + val executor: scala.concurrent.default.ExecutionContextImpl = context @volatile private var state: State[T] = _ @@ -85,40 +85,35 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) case _ => null } - /** Completes the promise with a value. - * - * @param value The value to complete the promise with. - * - * $promiseCompletion - */ - def success(value: T): Unit = { + def tryComplete(r: Either[Throwable, T]) = r match { + case Left(t) => tryFailure(t) + case Right(v) => trySuccess(v) + } + + def trySuccess(value: T): Boolean = { val cbs = tryCompleteState(Success(value)) if (cbs == null) - throw new IllegalStateException + false else { processCallbacks(cbs, Right(value)) this.synchronized { this.notifyAll() } + true } } - /** Completes the promise with an exception. - * - * @param t The throwable to complete the promise with. - * - * $promiseCompletion - */ - def failure(t: Throwable): Unit = { + def tryFailure(t: Throwable): Boolean = { val wrapped = wrap(t) val cbs = tryCompleteState(Failure(wrapped)) if (cbs == null) - throw new IllegalStateException + false else { processCallbacks(cbs, Left(wrapped)) this.synchronized { this.notifyAll() } + true } } @@ -140,9 +135,9 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) } private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) - extends RecursiveAction with Task[T] with Future[T] with Completable[T] { +extends RecursiveAction with Task[T] with Future[T] with Completable[T] { - val executionContext: ExecutionContextImpl = context + val executor: ExecutionContextImpl = context @volatile private var state: State[T] = _ @@ -179,8 +174,8 @@ private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) def start(): Unit = { Thread.currentThread match { - case fj: ForkJoinWorkerThread if fj.getPool eq executionContext.pool => fork() - case _ => executionContext.pool.execute(this) + case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork() + case _ => executor.pool.execute(this) } } @@ -264,7 +259,7 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { // TODO fix the timeout def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match { - case fj: TaskImpl[_] if fj.executionContext.pool eq pool => + case fj: TaskImpl[_] if fj.executor.pool eq pool => fj.await(timeout) case _ => var res: T = null.asInstanceOf[T] diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index c35ece5668..ce22c53c72 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -50,12 +50,22 @@ package object concurrent { case _ => true } - private[concurrent] def resolveThrowable[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { + private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T]) case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t)) + case Left(e: Error) => throw e case _ => source } + private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = { + case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value) + case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) + case e: Error => throw e + case t => Left(t) + } + + private[concurrent] def resolver[T] = resolverFunction.asInstanceOf[PartialFunction[Throwable, Either[Throwable, T]]] + /* concurrency constructs */ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = |