summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Suereth <Joshua.Suereth@gmail.com>2012-09-11 12:00:15 -0700
committerJosh Suereth <Joshua.Suereth@gmail.com>2012-09-11 12:00:15 -0700
commitea651e6fe187920d207aa5fe3c645d294e72e627 (patch)
treeb1f8a44115d5bad5e298f31f6123993088240056
parentad9b6bcdbcd48566f0b7f03b29c9252fcebc3926 (diff)
parentad6c261eafbe72e014d005d0452c8a628a07f123 (diff)
downloadscala-ea651e6fe187920d207aa5fe3c645d294e72e627.tar.gz
scala-ea651e6fe187920d207aa5fe3c645d294e72e627.tar.bz2
scala-ea651e6fe187920d207aa5fe3c645d294e72e627.zip
Merge pull request #1279 from rkuhn/fix-duration-usage
fix usage of Duration in Promise impl
-rw-r--r--src/library/scala/concurrent/Awaitable.scala23
-rw-r--r--src/library/scala/concurrent/BlockContext.scala3
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala1
-rw-r--r--src/library/scala/concurrent/Future.scala1
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala1
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala43
-rw-r--r--src/library/scala/concurrent/package.scala33
-rw-r--r--src/partest/scala/tools/partest/TestUtil.scala12
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala29
9 files changed, 110 insertions, 36 deletions
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
index 99bdfbc5a9..655115349a 100644
--- a/src/library/scala/concurrent/Awaitable.scala
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -16,15 +16,34 @@ import scala.concurrent.util.Duration
trait Awaitable[+T] {
/**
- * Should throw [[scala.concurrent.TimeoutException]] if it times out
+ * Await the "resolved" state of this Awaitable.
* This method should not be called directly.
+ *
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the Awaitable itself
+ * @throws InterruptedException if the wait call was interrupted
+ * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]]
*/
@throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type
/**
- * Throws exceptions if it cannot produce a T within the specified time.
+ * Await and return the result of this Awaitable, which is either of type T or a thrown exception (any Throwable).
* This method should not be called directly.
+ *
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the value if the Awaitable was successful within the specific maximum wait time
+ * @throws InterruptedException if the wait call was interrupted
+ * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]]
*/
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
index 640560a174..83333a9e94 100644
--- a/src/library/scala/concurrent/BlockContext.scala
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -8,9 +8,6 @@
package scala.concurrent
-import java.lang.Thread
-import scala.concurrent.util.Duration
-
/**
* A context to be notified by `scala.concurrent.blocking` when
* a thread is about to block. In effect this trait provides
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 1be6050303..844ec14241 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -10,7 +10,6 @@ package scala.concurrent
import java.util.concurrent.{ ExecutorService, Executor }
-import scala.concurrent.util.Duration
import scala.annotation.implicitNotFound
import scala.util.Try
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index bc0b437a33..a5d9cdd5d1 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -16,7 +16,6 @@ import java.lang.{ Iterable => JIterable }
import java.util.{ LinkedList => JLinkedList }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
-import scala.concurrent.util.Duration
import scala.util.control.NonFatal
import scala.Option
import scala.util.{Try, Success, Failure}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 875a558887..c517a05a81 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -14,7 +14,6 @@ import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorS
import java.util.Collection
import scala.concurrent.forkjoin._
import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
-import scala.concurrent.util.Duration
import scala.util.control.NonFatal
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index b19bed004b..f7ab85dc0c 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -12,7 +12,7 @@ package scala.concurrent.impl
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
-import scala.concurrent.util.Duration
+import scala.concurrent.util.{ Duration, Deadline }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.util.{ Try, Success, Failure }
@@ -64,30 +64,41 @@ private[concurrent] object Promise {
protected final def tryAwait(atMost: Duration): Boolean = {
@tailrec
- def awaitUnsafe(waitTimeNanos: Long): Boolean = {
- if (value.isEmpty && waitTimeNanos > 0) {
- val ms = NANOSECONDS.toMillis(waitTimeNanos)
- val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec
- val start = System.nanoTime()
- try {
- synchronized {
- if (!isCompleted) wait(ms, ns) // previously - this was a `while`, ending up in an infinite loop
- }
- } catch {
- case e: InterruptedException =>
- }
+ def awaitUnsafe(deadline: Deadline, nextWait: Duration): Boolean = {
+ if (!isCompleted && nextWait > Duration.Zero) {
+ val ms = nextWait.toMillis
+ val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec
+
+ synchronized { if (!isCompleted) wait(ms, ns) }
- awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
+ awaitUnsafe(deadline, deadline.timeLeft)
} else
isCompleted
}
- awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
+ @tailrec
+ def awaitUnbounded(): Boolean = {
+ if (isCompleted) true
+ else {
+ synchronized { if (!isCompleted) wait() }
+ awaitUnbounded()
+ }
+ }
+
+ if (atMost eq Duration.Undefined)
+ throw new IllegalArgumentException("cannot wait for Undefined period")
+ else if (atMost <= Duration.Zero)
+ isCompleted
+ else if (atMost == Duration.Inf)
+ awaitUnbounded()
+ else
+ awaitUnsafe(atMost.fromNow, atMost)
}
@throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (isCompleted || tryAwait(atMost)) this
- else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+ else throw new TimeoutException("Futures timed out after [" + atMost + "]")
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index a2ef42fac8..1d06341d4d 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -67,26 +67,39 @@ package concurrent {
*/
object Await {
/**
+ * Await the "resolved" state of this Awaitable.
* Invokes ready() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`.
- * ready() blocks until the awaitable has completed or the timeout expires.
*
- * Throws a TimeoutException if the timeout expires, as that is in the contract of `Awaitable.ready`.
- * @param awaitable the `Awaitable` on which `ready` is to be called
- * @param atMost the maximum timeout for which to wait
- * @return the result of `awaitable.ready` which is defined to be the awaitable itself.
+ * @param awaitable
+ * the `Awaitable` on which `ready` is to be called
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the awaitable itself
+ * @throws InterruptedException if the wait call was interrupted
+ * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]]
*/
@throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type =
blocking(awaitable.ready(atMost)(AwaitPermission))
/**
+ * Await and return the result of this Awaitable, which is either of type T or a thrown exception (any Throwable).
* Invokes result() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`.
- * result() blocks until the awaitable has completed or the timeout expires.
*
- * Throws a TimeoutException if the timeout expires, or any exception thrown by `Awaitable.result`.
- * @param awaitable the `Awaitable` on which `result` is to be called
- * @param atMost the maximum timeout for which to wait
- * @return the result of `awaitable.result`
+ * @param awaitable
+ * the `Awaitable` on which `result` is to be called
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the value if the Awaitable was successful within the specific maximum wait time
+ * @throws InterruptedException if the wait call was interrupted
+ * @throws TimeoutException if after waiting for the specified time this Awaitable is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[Duration.Undefined]]
*/
@throws(classOf[Exception])
def result[T](awaitable: Awaitable[T], atMost: Duration): T =
diff --git a/src/partest/scala/tools/partest/TestUtil.scala b/src/partest/scala/tools/partest/TestUtil.scala
index b86a8e2c7f..146e6fc69f 100644
--- a/src/partest/scala/tools/partest/TestUtil.scala
+++ b/src/partest/scala/tools/partest/TestUtil.scala
@@ -1,5 +1,7 @@
package scala.tools.partest
+import reflect.{ classTag, ClassTag }
+
trait TestUtil {
/** Given function and block of code, evaluates code block,
* calls function with nanoseconds elapsed, and returns block result.
@@ -29,8 +31,16 @@ trait TestUtil {
assert(mult <= acceptableMultiple, "Performance difference too great: multiple = " + mult)
}
+
+ def intercept[T <: Exception : ClassTag](code: => Unit): Unit =
+ try {
+ code
+ assert(false, "did not throw " + classTag[T])
+ } catch {
+ case ex: Exception if classTag[T].runtimeClass isInstance ex =>
+ }
}
object TestUtil extends TestUtil {
-} \ No newline at end of file
+}
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index ffb5608fd2..0e76b711de 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -11,6 +11,8 @@ import scala.concurrent.{
import scala.concurrent.{ future, promise, blocking }
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.Duration
+import scala.reflect.{ classTag, ClassTag }
+import scala.tools.partest.TestUtil.intercept
trait TestBase {
@@ -19,7 +21,7 @@ trait TestBase {
body(() => sv put true)
sv.take(2000)
}
-
+
// def assert(cond: => Boolean) {
// try {
// Predef.assert(cond)
@@ -663,6 +665,29 @@ trait FutureProjections extends TestBase {
case nsee: NoSuchElementException => done()
}
}
+
+ def testAwaitPositiveDuration(): Unit = once { done =>
+ val p = Promise[Int]()
+ val f = p.future
+ future {
+ intercept[IllegalArgumentException] { Await.ready(f, Duration.Undefined) }
+ p.success(0)
+ Await.ready(f, Duration.Zero)
+ Await.ready(f, Duration(500, "ms"))
+ Await.ready(f, Duration.Inf)
+ done()
+ } onFailure { case x => throw x }
+ }
+
+ def testAwaitNegativeDuration(): Unit = once { done =>
+ val f = Promise().future
+ future {
+ intercept[TimeoutException] { Await.ready(f, Duration.Zero) }
+ intercept[TimeoutException] { Await.ready(f, Duration.MinusInf) }
+ intercept[TimeoutException] { Await.ready(f, Duration(-500, "ms")) }
+ done()
+ } onFailure { case x => throw x }
+ }
testFailedFailureOnComplete()
testFailedFailureOnSuccess()
@@ -670,6 +695,8 @@ trait FutureProjections extends TestBase {
testFailedSuccessOnFailure()
testFailedFailureAwait()
testFailedSuccessAwait()
+ testAwaitPositiveDuration()
+ testAwaitNegativeDuration()
}