diff options
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 28 | ||||
-rw-r--r-- | src/library/scala/concurrent/Promise.scala | 32 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 55 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 103 |
4 files changed, 72 insertions, 146 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 11505e4146..70b3c3dbbb 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -131,10 +131,6 @@ trait Future[+T] extends Awaitable[T] { /* Miscellaneous */ - /** Creates a new promise. - */ - protected def newPromise[S]: Promise[S] - /** Returns whether the future has already been completed with * a value or an exception. * @@ -169,7 +165,7 @@ trait Future[+T] extends Awaitable[T] { * and throws a corresponding exception if the original future fails. */ def failed: Future[Throwable] = { - val p = newPromise[Throwable] + val p = Promise[Throwable]() onComplete { case Left(t) => p success t @@ -198,7 +194,7 @@ trait Future[+T] extends Awaitable[T] { * $forComprehensionExample */ def map[S](f: T => S): Future[S] = { - val p = newPromise[S] + val p = Promise[S]() onComplete { case Left(t) => p failure t @@ -220,7 +216,7 @@ trait Future[+T] extends Awaitable[T] { * $forComprehensionExample */ def flatMap[S](f: T => Future[S]): Future[S] = { - val p = newPromise[S] + val p = Promise[S]() onComplete { case Left(t) => p failure t @@ -255,7 +251,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def filter(pred: T => Boolean): Future[T] = { - val p = newPromise[T] + val p = Promise[T]() onComplete { case Left(t) => p failure t @@ -304,7 +300,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def collect[S](pf: PartialFunction[T, S]): Future[S] = { - val p = newPromise[S] + val p = Promise[S]() onComplete { case Left(t) => p failure t @@ -333,7 +329,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = { - val p = newPromise[U] + val p = Promise[U]() onComplete { case Left(t) if pf isDefinedAt t => @@ -359,7 +355,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { - val p = newPromise[U] + val p = Promise[U]() onComplete { case Left(t) if pf isDefinedAt t => @@ -383,7 +379,7 @@ trait Future[+T] extends Awaitable[T] { * with the throwable stored in `that`. */ def zip[U](that: Future[U]): Future[(T, U)] = { - val p = newPromise[(T, U)] + val p = Promise[(T, U)]() this onComplete { case Left(t) => p failure t @@ -414,7 +410,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def fallbackTo[U >: T](that: Future[U]): Future[U] = { - val p = newPromise[U] + val p = Promise[U]() onComplete { case r @ Right(_) ⇒ p complete r case _ ⇒ p completeWith that @@ -430,7 +426,7 @@ trait Future[+T] extends Awaitable[T] { if (c.isPrimitive) Future.toBoxed(c) else c } - val p = newPromise[S] + val p = Promise[S]() onComplete { case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]] @@ -469,7 +465,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = { - val p = newPromise[T] + val p = Promise[T]() onComplete { case r => @@ -494,7 +490,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def either[U >: T](that: Future[U]): Future[U] = { - val p = newPromise[U] + val p = Promise[U]() val completePromise: PartialFunction[Either[Throwable, U], _] = { case Left(t) => p tryFailure t diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index cd22a55ce7..f7ec0714cf 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -35,7 +35,8 @@ trait Promise[T] { * * $promiseCompletion */ - def complete(result: Either[Throwable, T]): this.type = if (tryComplete(result)) this else throwCompleted + def complete(result: Either[Throwable, T]): this.type = + if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.") /** Tries to complete the promise with either a value or the exception. * @@ -50,9 +51,16 @@ trait Promise[T] { * @return This promise */ final def completeWith(other: Future[T]): this.type = { - other onComplete { - this complete _ - } + other onComplete { this complete _ } + this + } + + /** Attempts to complete this promise with the specified future, once that future is completed. + * + * @return This promise + */ + final def tryCompleteWith(other: Future[T]): this.type = { + other onComplete { this tryComplete _ } this } @@ -62,7 +70,7 @@ trait Promise[T] { * * $promiseCompletion */ - def success(v: T): this.type = if (trySuccess(v)) this else throwCompleted + def success(v: T): this.type = complete(Right(v)) /** Tries to complete the promise with a value. * @@ -80,7 +88,7 @@ trait Promise[T] { * * $promiseCompletion */ - def failure(t: Throwable): this.type = if (tryFailure(t)) this else throwCompleted + def failure(t: Throwable): this.type = complete(Left(t)) /** Tries to complete the promise with an exception. * @@ -89,18 +97,6 @@ trait Promise[T] { * @return If the promise has already been completed returns `false`, or `true` otherwise. */ def tryFailure(t: Throwable): Boolean = tryComplete(Left(t)) - - /** Wraps a `Throwable` in an `ExecutionException` if necessary. TODO replace with `resolver` from scala.concurrent - * - * $allowedThrowables - */ - protected def wrap(t: Throwable): Throwable = t match { - case t: Throwable if isFutureThrowable(t) => t - case _ => new ExecutionException(t) - } - - private def throwCompleted = throw new IllegalStateException("Promise already completed.") - } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 548524c9fe..20d4122e8f 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -19,29 +19,23 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa implicit def executor: ExecutionContext - /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Future without blocking, by suspending execution and storing it as a - * continuation until the result is available. - */ - //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) - - /** Tests whether this Future has been completed. - */ - def isCompleted: Boolean - - /** The contained value of this Future. Before this Future is completed - * the value will be None. After completion the value will be Some(Right(t)) - * if it contains a valid result, or Some(Left(error)) if it contains - * an exception. - */ - def value: Option[Either[Throwable, T]] - - def onComplete[U](func: Either[Throwable, T] => U): this.type - } -object Future { +private[concurrent] object Future { + import java.{ lang => jl } + + private val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit] + ) + /** Wraps a block of code into an awaitable object. */ private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] { def ready(atMost: Duration)(implicit permit: CanAwait) = { @@ -51,24 +45,23 @@ object Future { def result(atMost: Duration)(implicit permit: CanAwait) = body } + def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c + def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { val promise = new Promise.DefaultPromise[T]() //TODO: use `dispatchFuture`? executor.execute(new Runnable { - def run = { - promise complete { - try { - Right(body) - } catch { - case NonFatal(e) => - // Commenting out reporting for now, since it produces too much output in the tests - //executor.reportFailure(e) - scala.concurrent.resolver(e) - } + def run = promise complete { + try Right(body) catch { + case NonFatal(e) => + // Commenting out reporting for now, since it produces too much output in the tests + //executor.reportFailure(e) + scala.concurrent.resolver(e) } } }) + promise.future } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index b508c3b2e9..da70b3dea5 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -21,54 +21,7 @@ import scala.annotation.tailrec private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { - - def future = this - - def newPromise[S]: scala.concurrent.Promise[S] = new Promise.DefaultPromise() - - // TODO refine answer and return types here from Any to type parameters - // then move this up in the hierarchy - /* - final def <<(value: T): Future[T] @cps[Future[Any]] = shift { - cont: (Future[T] => Future[Any]) => - cont(complete(Right(value))) - } - - final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { - cont: (Future[T] => Future[Any]) => - val p = executor.promise[Any] - val thisPromise = this - - thisPromise completeWith other - thisPromise onComplete { v => - try { - p completeWith cont(thisPromise) - } catch { - case e => p complete resolver(e) - } - } - - p.future - } - */ - // TODO finish this once we introduce something like dataflow streams - - /* - final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => - val fr = executor.promise[Any] - val f = stream.dequeue(this) - f.onComplete { _ => - try { - fr completeWith cont(f) - } catch { - case e => - fr failure e - } - } - fr - } - */ - + def future: this.type = this } @@ -124,48 +77,36 @@ object Promise { } def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] => Unit] = { - try { - @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { - getState match { - case raw: List[_] => - val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]] - if (updateState(cur, v)) cur else tryComplete(v) - case _ => null - } - } - tryComplete(resolveEither(value)) - } finally { - synchronized { //Notify any evil blockers - notifyAll() + val resolved = resolveEither(value) + (try { + @tailrec + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { + getState match { + case raw: List[_] => + val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]] + if (updateState(cur, v)) cur else tryComplete(v) + case _ => null } } - } - - callbacks match { + tryComplete(resolved) + } finally { + synchronized { notifyAll() } //Notify any evil blockers + }) match { case null => false case cs if cs.isEmpty => true - case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, value))); true + case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, resolved))); true } } def onComplete[U](func: Either[Throwable, T] => U): this.type = { - @tailrec //Returns the future's results if it has already been completed, or null otherwise. - def tryAddCallback(): Either[Throwable, T] = { - val cur = getState - cur match { - case r: Either[_, _] => r.asInstanceOf[Either[Throwable, T]] - case listeners: List[_] => if (updateState(listeners, func :: listeners)) null else tryAddCallback() + @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed + def dispatchOrAddCallback(): Unit = + getState match { + case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]])) + case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback() } - } - - tryAddCallback() match { - case null => this - case completed => - Future.dispatchFuture(executor, () => notifyCompleted(func, completed)) - this - } + dispatchOrAddCallback() + this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { |