From 3767a6a83efc74d34e9025f798eeb2a043e6df8d Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 10 Sep 2012 15:52:07 +0200 Subject: fix usage of Duration in Promise impl - correctly treat MinusInf and Undefined - don't toMillis in the timeout message (could be MinusInf) - also notice that Inf did not actually wait unbounded - and further notice that tryAwait swallows InterruptedException instead of bailing out early => changed to do so and added throws annotation - also removed some unused imports of Duration --- src/library/scala/concurrent/Awaitable.scala | 5 ++++ src/library/scala/concurrent/BlockContext.scala | 3 -- .../scala/concurrent/ExecutionContext.scala | 1 - src/library/scala/concurrent/Future.scala | 1 - .../concurrent/impl/ExecutionContextImpl.scala | 1 - src/library/scala/concurrent/impl/Promise.scala | 33 ++++++++++++++++------ src/library/scala/concurrent/package.scala | 3 ++ 7 files changed, 32 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala index 99bdfbc5a9..2205dd9869 100644 --- a/src/library/scala/concurrent/Awaitable.scala +++ b/src/library/scala/concurrent/Awaitable.scala @@ -18,13 +18,18 @@ trait Awaitable[+T] { /** * Should throw [[scala.concurrent.TimeoutException]] if it times out * This method should not be called directly. + * + * @throws InterruptedException if the wait call was interrupted */ @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type /** * Throws exceptions if it cannot produce a T within the specified time. * This method should not be called directly. + * + * @throws InterruptedException if the wait call was interrupted */ @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala index 640560a174..83333a9e94 100644 --- a/src/library/scala/concurrent/BlockContext.scala +++ b/src/library/scala/concurrent/BlockContext.scala @@ -8,9 +8,6 @@ package scala.concurrent -import java.lang.Thread -import scala.concurrent.util.Duration - /** * A context to be notified by `scala.concurrent.blocking` when * a thread is about to block. In effect this trait provides diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 1be6050303..844ec14241 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -10,7 +10,6 @@ package scala.concurrent import java.util.concurrent.{ ExecutorService, Executor } -import scala.concurrent.util.Duration import scala.annotation.implicitNotFound import scala.util.Try diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index bc0b437a33..a5d9cdd5d1 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -16,7 +16,6 @@ import java.lang.{ Iterable => JIterable } import java.util.{ LinkedList => JLinkedList } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } -import scala.concurrent.util.Duration import scala.util.control.NonFatal import scala.Option import scala.util.{Try, Success, Failure} diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 875a558887..c517a05a81 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -14,7 +14,6 @@ import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorS import java.util.Collection import scala.concurrent.forkjoin._ import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } -import scala.concurrent.util.Duration import scala.util.control.NonFatal diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index b19bed004b..35aac974ec 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -65,29 +65,44 @@ private[concurrent] object Promise { protected final def tryAwait(atMost: Duration): Boolean = { @tailrec def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (value.isEmpty && waitTimeNanos > 0) { + if (!isCompleted && waitTimeNanos > 0) { val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec val start = System.nanoTime() - try { - synchronized { - if (!isCompleted) wait(ms, ns) // previously - this was a `while`, ending up in an infinite loop - } - } catch { - case e: InterruptedException => + synchronized { + if (!isCompleted) wait(ms, ns) // previously - this was a `while`, ending up in an infinite loop } awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) } else isCompleted } - awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) + @tailrec + def awaitUnbounded(): Boolean = { + if (isCompleted) true + else { + synchronized { + if (!isCompleted) wait() + } + awaitUnbounded() + } + } + + if (atMost <= Duration.Zero) + isCompleted + else if (atMost eq Duration.Undefined) + throw new IllegalArgumentException("cannot wait for Undefined period") + else if (atMost == Duration.Inf) + awaitUnbounded() + else + awaitUnsafe(atMost.toNanos) } @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = if (isCompleted || tryAwait(atMost)) this - else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + else throw new TimeoutException("Futures timed out after [" + atMost + "]") @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index a2ef42fac8..3681109653 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -74,8 +74,10 @@ package concurrent { * @param awaitable the `Awaitable` on which `ready` is to be called * @param atMost the maximum timeout for which to wait * @return the result of `awaitable.ready` which is defined to be the awaitable itself. + * @throws InterruptedException if the wait call was interrupted */ @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = blocking(awaitable.ready(atMost)(AwaitPermission)) @@ -87,6 +89,7 @@ package concurrent { * @param awaitable the `Awaitable` on which `result` is to be called * @param atMost the maximum timeout for which to wait * @return the result of `awaitable.result` + * @throws InterruptedException if the wait call was interrupted */ @throws(classOf[Exception]) def result[T](awaitable: Awaitable[T], atMost: Duration): T = -- cgit v1.2.3