diff options
Diffstat (limited to 'src/library/scala/concurrent/impl/Promise.scala')
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 29 |
1 files changed, 15 insertions, 14 deletions
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 4a983b5001..c79b0d02cc 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -26,7 +26,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu def future = this - def newPromise[S]: Promise[S] = executor promise + def newPromise[S]: scala.concurrent.Promise[S] = new Promise.DefaultPromise() // TODO refine answer and return types here from Any to type parameters // then move this up in the hierarchy @@ -75,6 +75,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu object Promise { + def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] @@ -101,7 +102,7 @@ object Promise { /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] { + class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => updater.set(this, Promise.EmptyPending()) @@ -126,14 +127,14 @@ object Promise { value.isDefined } - executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0)) + executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) } - private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = if (value.isDefined || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") - def await(atMost: Duration)(implicit permit: CanAwait): T = + def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { case util.Failure(e) => throw e case util.Success(r) => r @@ -176,9 +177,9 @@ object Promise { case null => false case cs if cs.isEmpty => true case cs => - executor dispatchFuture { + Future.dispatchFuture(executor, { () => cs.foreach(f => notifyCompleted(f, value)) - } + }) true } } @@ -197,9 +198,9 @@ object Promise { if (tryAddCallback()) { val result = value.get - executor dispatchFuture { + Future.dispatchFuture(executor, { () => notifyCompleted(func, result) - } + }) } this @@ -218,22 +219,22 @@ object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContext) extends Promise[T] { val value = Some(resolve(suppliedValue)) def tryComplete(value: Try[T]): Boolean = false def onComplete[U](func: Try[T] => U): this.type = { val completedAs = value.get - executor dispatchFuture { + Future.dispatchFuture(executor, { () => func(completedAs) - } + }) this } - private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { case util.Failure(e) => throw e case util.Success(r) => r } |