diff options
-rw-r--r-- | src/library/scala/concurrent/Awaitable.scala | 23 | ||||
-rw-r--r-- | src/library/scala/concurrent/BlockContext.scala | 3 | ||||
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 1 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 1 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 1 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 43 | ||||
-rw-r--r-- | src/library/scala/concurrent/package.scala | 33 | ||||
-rw-r--r-- | src/partest/scala/tools/partest/TestUtil.scala | 12 | ||||
-rw-r--r-- | test/files/jvm/scala-concurrent-tck.scala | 29 |
9 files changed, 110 insertions, 36 deletions
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala index 99bdfbc5a9..655115349a 100644 --- a/src/library/scala/concurrent/Awaitable.scala +++ b/src/library/scala/concurrent/Awaitable.scala @@ -16,15 +16,34 @@ import scala.concurrent.util.Duration trait Awaitable[+T] { /** - * Should throw [[scala.concurrent.TimeoutException]] if it times out + * Await the "resolved" state of this Awaitable. * This method should not be called directly. + * + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the Awaitable itself + * @throws InterruptedException if the wait call was interrupted + * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready + * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]] */ @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type /** - * Throws exceptions if it cannot produce a T within the specified time. + * Await and return the result of this Awaitable, which is either of type T or a thrown exception (any Throwable). * This method should not be called directly. + * + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the value if the Awaitable was successful within the specific maximum wait time + * @throws InterruptedException if the wait call was interrupted + * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready + * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]] */ @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala index 640560a174..83333a9e94 100644 --- a/src/library/scala/concurrent/BlockContext.scala +++ b/src/library/scala/concurrent/BlockContext.scala @@ -8,9 +8,6 @@ package scala.concurrent -import java.lang.Thread -import scala.concurrent.util.Duration - /** * A context to be notified by `scala.concurrent.blocking` when * a thread is about to block. In effect this trait provides diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 1be6050303..844ec14241 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -10,7 +10,6 @@ package scala.concurrent import java.util.concurrent.{ ExecutorService, Executor } -import scala.concurrent.util.Duration import scala.annotation.implicitNotFound import scala.util.Try diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index bc0b437a33..a5d9cdd5d1 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -16,7 +16,6 @@ import java.lang.{ Iterable => JIterable } import java.util.{ LinkedList => JLinkedList } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } -import scala.concurrent.util.Duration import scala.util.control.NonFatal import scala.Option import scala.util.{Try, Success, Failure} diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 875a558887..c517a05a81 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -14,7 +14,6 @@ import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorS import java.util.Collection import scala.concurrent.forkjoin._ import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } -import scala.concurrent.util.Duration import scala.util.control.NonFatal diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index b19bed004b..f7ab85dc0c 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -12,7 +12,7 @@ package scala.concurrent.impl import java.util.concurrent.TimeUnit.NANOSECONDS import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException } -import scala.concurrent.util.Duration +import scala.concurrent.util.{ Duration, Deadline } import scala.annotation.tailrec import scala.util.control.NonFatal import scala.util.{ Try, Success, Failure } @@ -64,30 +64,41 @@ private[concurrent] object Promise { protected final def tryAwait(atMost: Duration): Boolean = { @tailrec - def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (value.isEmpty && waitTimeNanos > 0) { - val ms = NANOSECONDS.toMillis(waitTimeNanos) - val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec - val start = System.nanoTime() - try { - synchronized { - if (!isCompleted) wait(ms, ns) // previously - this was a `while`, ending up in an infinite loop - } - } catch { - case e: InterruptedException => - } + def awaitUnsafe(deadline: Deadline, nextWait: Duration): Boolean = { + if (!isCompleted && nextWait > Duration.Zero) { + val ms = nextWait.toMillis + val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec + + synchronized { if (!isCompleted) wait(ms, ns) } - awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) + awaitUnsafe(deadline, deadline.timeLeft) } else isCompleted } - awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) + @tailrec + def awaitUnbounded(): Boolean = { + if (isCompleted) true + else { + synchronized { if (!isCompleted) wait() } + awaitUnbounded() + } + } + + if (atMost eq Duration.Undefined) + throw new IllegalArgumentException("cannot wait for Undefined period") + else if (atMost <= Duration.Zero) + isCompleted + else if (atMost == Duration.Inf) + awaitUnbounded() + else + awaitUnsafe(atMost.fromNow, atMost) } @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = if (isCompleted || tryAwait(atMost)) this - else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + else throw new TimeoutException("Futures timed out after [" + atMost + "]") @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index a2ef42fac8..1d06341d4d 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -67,26 +67,39 @@ package concurrent { */ object Await { /** + * Await the "resolved" state of this Awaitable. * Invokes ready() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`. - * ready() blocks until the awaitable has completed or the timeout expires. * - * Throws a TimeoutException if the timeout expires, as that is in the contract of `Awaitable.ready`. - * @param awaitable the `Awaitable` on which `ready` is to be called - * @param atMost the maximum timeout for which to wait - * @return the result of `awaitable.ready` which is defined to be the awaitable itself. + * @param awaitable + * the `Awaitable` on which `ready` is to be called + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the awaitable itself + * @throws InterruptedException if the wait call was interrupted + * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready + * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]] */ @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = blocking(awaitable.ready(atMost)(AwaitPermission)) /** + * Await and return the result of this Awaitable, which is either of type T or a thrown exception (any Throwable). * Invokes result() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`. - * result() blocks until the awaitable has completed or the timeout expires. * - * Throws a TimeoutException if the timeout expires, or any exception thrown by `Awaitable.result`. - * @param awaitable the `Awaitable` on which `result` is to be called - * @param atMost the maximum timeout for which to wait - * @return the result of `awaitable.result` + * @param awaitable + * the `Awaitable` on which `result` is to be called + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the value if the Awaitable was successful within the specific maximum wait time + * @throws InterruptedException if the wait call was interrupted + * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready + * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]] */ @throws(classOf[Exception]) def result[T](awaitable: Awaitable[T], atMost: Duration): T = diff --git a/src/partest/scala/tools/partest/TestUtil.scala b/src/partest/scala/tools/partest/TestUtil.scala index b86a8e2c7f..146e6fc69f 100644 --- a/src/partest/scala/tools/partest/TestUtil.scala +++ b/src/partest/scala/tools/partest/TestUtil.scala @@ -1,5 +1,7 @@ package scala.tools.partest +import reflect.{ classTag, ClassTag } + trait TestUtil { /** Given function and block of code, evaluates code block, * calls function with nanoseconds elapsed, and returns block result. @@ -29,8 +31,16 @@ trait TestUtil { assert(mult <= acceptableMultiple, "Performance difference too great: multiple = " + mult) } + + def intercept[T <: Exception : ClassTag](code: => Unit): Unit = + try { + code + assert(false, "did not throw " + classTag[T]) + } catch { + case ex: Exception if classTag[T].runtimeClass isInstance ex => + } } object TestUtil extends TestUtil { -}
\ No newline at end of file +} diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index ffb5608fd2..0e76b711de 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -11,6 +11,8 @@ import scala.concurrent.{ import scala.concurrent.{ future, promise, blocking } import scala.util.{ Try, Success, Failure } import scala.concurrent.util.Duration +import scala.reflect.{ classTag, ClassTag } +import scala.tools.partest.TestUtil.intercept trait TestBase { @@ -19,7 +21,7 @@ trait TestBase { body(() => sv put true) sv.take(2000) } - + // def assert(cond: => Boolean) { // try { // Predef.assert(cond) @@ -663,6 +665,29 @@ trait FutureProjections extends TestBase { case nsee: NoSuchElementException => done() } } + + def testAwaitPositiveDuration(): Unit = once { done => + val p = Promise[Int]() + val f = p.future + future { + intercept[IllegalArgumentException] { Await.ready(f, Duration.Undefined) } + p.success(0) + Await.ready(f, Duration.Zero) + Await.ready(f, Duration(500, "ms")) + Await.ready(f, Duration.Inf) + done() + } onFailure { case x => throw x } + } + + def testAwaitNegativeDuration(): Unit = once { done => + val f = Promise().future + future { + intercept[TimeoutException] { Await.ready(f, Duration.Zero) } + intercept[TimeoutException] { Await.ready(f, Duration.MinusInf) } + intercept[TimeoutException] { Await.ready(f, Duration(-500, "ms")) } + done() + } onFailure { case x => throw x } + } testFailedFailureOnComplete() testFailedFailureOnSuccess() @@ -670,6 +695,8 @@ trait FutureProjections extends TestBase { testFailedSuccessOnFailure() testFailedFailureAwait() testFailedSuccessAwait() + testAwaitPositiveDuration() + testAwaitNegativeDuration() } |