summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/Future.scala28
-rw-r--r--src/library/scala/concurrent/Promise.scala32
-rw-r--r--src/library/scala/concurrent/impl/Future.scala55
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala103
4 files changed, 72 insertions, 146 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 11505e4146..70b3c3dbbb 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -131,10 +131,6 @@ trait Future[+T] extends Awaitable[T] {
/* Miscellaneous */
- /** Creates a new promise.
- */
- protected def newPromise[S]: Promise[S]
-
/** Returns whether the future has already been completed with
* a value or an exception.
*
@@ -169,7 +165,7 @@ trait Future[+T] extends Awaitable[T] {
* and throws a corresponding exception if the original future fails.
*/
def failed: Future[Throwable] = {
- val p = newPromise[Throwable]
+ val p = Promise[Throwable]()
onComplete {
case Left(t) => p success t
@@ -198,7 +194,7 @@ trait Future[+T] extends Awaitable[T] {
* $forComprehensionExample
*/
def map[S](f: T => S): Future[S] = {
- val p = newPromise[S]
+ val p = Promise[S]()
onComplete {
case Left(t) => p failure t
@@ -220,7 +216,7 @@ trait Future[+T] extends Awaitable[T] {
* $forComprehensionExample
*/
def flatMap[S](f: T => Future[S]): Future[S] = {
- val p = newPromise[S]
+ val p = Promise[S]()
onComplete {
case Left(t) => p failure t
@@ -255,7 +251,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def filter(pred: T => Boolean): Future[T] = {
- val p = newPromise[T]
+ val p = Promise[T]()
onComplete {
case Left(t) => p failure t
@@ -304,7 +300,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def collect[S](pf: PartialFunction[T, S]): Future[S] = {
- val p = newPromise[S]
+ val p = Promise[S]()
onComplete {
case Left(t) => p failure t
@@ -333,7 +329,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
- val p = newPromise[U]
+ val p = Promise[U]()
onComplete {
case Left(t) if pf isDefinedAt t =>
@@ -359,7 +355,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
- val p = newPromise[U]
+ val p = Promise[U]()
onComplete {
case Left(t) if pf isDefinedAt t =>
@@ -383,7 +379,7 @@ trait Future[+T] extends Awaitable[T] {
* with the throwable stored in `that`.
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
- val p = newPromise[(T, U)]
+ val p = Promise[(T, U)]()
this onComplete {
case Left(t) => p failure t
@@ -414,7 +410,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
- val p = newPromise[U]
+ val p = Promise[U]()
onComplete {
case r @ Right(_) ⇒ p complete r
case _ ⇒ p completeWith that
@@ -430,7 +426,7 @@ trait Future[+T] extends Awaitable[T] {
if (c.isPrimitive) Future.toBoxed(c) else c
}
- val p = newPromise[S]
+ val p = Promise[S]()
onComplete {
case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]]
@@ -469,7 +465,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
- val p = newPromise[T]
+ val p = Promise[T]()
onComplete {
case r =>
@@ -494,7 +490,7 @@ trait Future[+T] extends Awaitable[T] {
* }}}
*/
def either[U >: T](that: Future[U]): Future[U] = {
- val p = newPromise[U]
+ val p = Promise[U]()
val completePromise: PartialFunction[Either[Throwable, U], _] = {
case Left(t) => p tryFailure t
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index cd22a55ce7..f7ec0714cf 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -35,7 +35,8 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def complete(result: Either[Throwable, T]): this.type = if (tryComplete(result)) this else throwCompleted
+ def complete(result: Either[Throwable, T]): this.type =
+ if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
/** Tries to complete the promise with either a value or the exception.
*
@@ -50,9 +51,16 @@ trait Promise[T] {
* @return This promise
*/
final def completeWith(other: Future[T]): this.type = {
- other onComplete {
- this complete _
- }
+ other onComplete { this complete _ }
+ this
+ }
+
+ /** Attempts to complete this promise with the specified future, once that future is completed.
+ *
+ * @return This promise
+ */
+ final def tryCompleteWith(other: Future[T]): this.type = {
+ other onComplete { this tryComplete _ }
this
}
@@ -62,7 +70,7 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def success(v: T): this.type = if (trySuccess(v)) this else throwCompleted
+ def success(v: T): this.type = complete(Right(v))
/** Tries to complete the promise with a value.
*
@@ -80,7 +88,7 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def failure(t: Throwable): this.type = if (tryFailure(t)) this else throwCompleted
+ def failure(t: Throwable): this.type = complete(Left(t))
/** Tries to complete the promise with an exception.
*
@@ -89,18 +97,6 @@ trait Promise[T] {
* @return If the promise has already been completed returns `false`, or `true` otherwise.
*/
def tryFailure(t: Throwable): Boolean = tryComplete(Left(t))
-
- /** Wraps a `Throwable` in an `ExecutionException` if necessary. TODO replace with `resolver` from scala.concurrent
- *
- * $allowedThrowables
- */
- protected def wrap(t: Throwable): Throwable = t match {
- case t: Throwable if isFutureThrowable(t) => t
- case _ => new ExecutionException(t)
- }
-
- private def throwCompleted = throw new IllegalStateException("Promise already completed.")
-
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 548524c9fe..20d4122e8f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -19,29 +19,23 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
implicit def executor: ExecutionContext
- /** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
- *
- * Returns the result of this Future without blocking, by suspending execution and storing it as a
- * continuation until the result is available.
- */
- //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any]))
-
- /** Tests whether this Future has been completed.
- */
- def isCompleted: Boolean
-
- /** The contained value of this Future. Before this Future is completed
- * the value will be None. After completion the value will be Some(Right(t))
- * if it contains a valid result, or Some(Left(error)) if it contains
- * an exception.
- */
- def value: Option[Either[Throwable, T]]
-
- def onComplete[U](func: Either[Throwable, T] => U): this.type
-
}
-object Future {
+private[concurrent] object Future {
+ import java.{ lang => jl }
+
+ private val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[jl.Boolean],
+ classOf[Byte] -> classOf[jl.Byte],
+ classOf[Char] -> classOf[jl.Character],
+ classOf[Short] -> classOf[jl.Short],
+ classOf[Int] -> classOf[jl.Integer],
+ classOf[Long] -> classOf[jl.Long],
+ classOf[Float] -> classOf[jl.Float],
+ classOf[Double] -> classOf[jl.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
+ )
+
/** Wraps a block of code into an awaitable object. */
private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] {
def ready(atMost: Duration)(implicit permit: CanAwait) = {
@@ -51,24 +45,23 @@ object Future {
def result(atMost: Duration)(implicit permit: CanAwait) = body
}
+ def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
+
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
val promise = new Promise.DefaultPromise[T]()
//TODO: use `dispatchFuture`?
executor.execute(new Runnable {
- def run = {
- promise complete {
- try {
- Right(body)
- } catch {
- case NonFatal(e) =>
- // Commenting out reporting for now, since it produces too much output in the tests
- //executor.reportFailure(e)
- scala.concurrent.resolver(e)
- }
+ def run = promise complete {
+ try Right(body) catch {
+ case NonFatal(e) =>
+ // Commenting out reporting for now, since it produces too much output in the tests
+ //executor.reportFailure(e)
+ scala.concurrent.resolver(e)
}
}
})
+
promise.future
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index b508c3b2e9..da70b3dea5 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -21,54 +21,7 @@ import scala.annotation.tailrec
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
-
- def future = this
-
- def newPromise[S]: scala.concurrent.Promise[S] = new Promise.DefaultPromise()
-
- // TODO refine answer and return types here from Any to type parameters
- // then move this up in the hierarchy
- /*
- final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
- cont: (Future[T] => Future[Any]) =>
- cont(complete(Right(value)))
- }
-
- final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift {
- cont: (Future[T] => Future[Any]) =>
- val p = executor.promise[Any]
- val thisPromise = this
-
- thisPromise completeWith other
- thisPromise onComplete { v =>
- try {
- p completeWith cont(thisPromise)
- } catch {
- case e => p complete resolver(e)
- }
- }
-
- p.future
- }
- */
- // TODO finish this once we introduce something like dataflow streams
-
- /*
- final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) =>
- val fr = executor.promise[Any]
- val f = stream.dequeue(this)
- f.onComplete { _ =>
- try {
- fr completeWith cont(f)
- } catch {
- case e =>
- fr failure e
- }
- }
- fr
- }
- */
-
+ def future: this.type = this
}
@@ -124,48 +77,36 @@ object Promise {
}
def tryComplete(value: Either[Throwable, T]): Boolean = {
- val callbacks: List[Either[Throwable, T] => Unit] = {
- try {
- @tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
- getState match {
- case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]]
- if (updateState(cur, v)) cur else tryComplete(v)
- case _ => null
- }
- }
- tryComplete(resolveEither(value))
- } finally {
- synchronized { //Notify any evil blockers
- notifyAll()
+ val resolved = resolveEither(value)
+ (try {
+ @tailrec
+ def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
+ getState match {
+ case raw: List[_] =>
+ val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]]
+ if (updateState(cur, v)) cur else tryComplete(v)
+ case _ => null
}
}
- }
-
- callbacks match {
+ tryComplete(resolved)
+ } finally {
+ synchronized { notifyAll() } //Notify any evil blockers
+ }) match {
case null => false
case cs if cs.isEmpty => true
- case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, value))); true
+ case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, resolved))); true
}
}
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
- @tailrec //Returns the future's results if it has already been completed, or null otherwise.
- def tryAddCallback(): Either[Throwable, T] = {
- val cur = getState
- cur match {
- case r: Either[_, _] => r.asInstanceOf[Either[Throwable, T]]
- case listeners: List[_] => if (updateState(listeners, func :: listeners)) null else tryAddCallback()
+ @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
+ def dispatchOrAddCallback(): Unit =
+ getState match {
+ case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]]))
+ case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback()
}
- }
-
- tryAddCallback() match {
- case null => this
- case completed =>
- Future.dispatchFuture(executor, () => notifyCompleted(func, completed))
- this
- }
+ dispatchOrAddCallback()
+ this
}
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {