summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoland <rk@rkuhn.info>2012-09-10 15:52:07 +0200
committerRoland <rk@rkuhn.info>2012-09-10 15:52:07 +0200
commit3767a6a83efc74d34e9025f798eeb2a043e6df8d (patch)
tree85d5b78144bf8a9ab4ddd41c74c69ef2d7bb0db5
parentb7e08723d142c8227181eed283e7c982f449425a (diff)
downloadscala-3767a6a83efc74d34e9025f798eeb2a043e6df8d.tar.gz
scala-3767a6a83efc74d34e9025f798eeb2a043e6df8d.tar.bz2
scala-3767a6a83efc74d34e9025f798eeb2a043e6df8d.zip
fix usage of Duration in Promise impl
- correctly treat MinusInf and Undefined - don't toMillis in the timeout message (could be MinusInf) - also notice that Inf did not actually wait unbounded - and further notice that tryAwait swallows InterruptedException instead of bailing out early => changed to do so and added throws annotation - also removed some unused imports of Duration
-rw-r--r--src/library/scala/concurrent/Awaitable.scala5
-rw-r--r--src/library/scala/concurrent/BlockContext.scala3
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala1
-rw-r--r--src/library/scala/concurrent/Future.scala1
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala1
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala33
-rw-r--r--src/library/scala/concurrent/package.scala3
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala34
8 files changed, 66 insertions, 15 deletions
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
index 99bdfbc5a9..2205dd9869 100644
--- a/src/library/scala/concurrent/Awaitable.scala
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -18,13 +18,18 @@ trait Awaitable[+T] {
/**
* Should throw [[scala.concurrent.TimeoutException]] if it times out
* This method should not be called directly.
+ *
+ * @throws InterruptedException if the wait call was interrupted
*/
@throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type
/**
* Throws exceptions if it cannot produce a T within the specified time.
* This method should not be called directly.
+ *
+ * @throws InterruptedException if the wait call was interrupted
*/
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
index 640560a174..83333a9e94 100644
--- a/src/library/scala/concurrent/BlockContext.scala
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -8,9 +8,6 @@
package scala.concurrent
-import java.lang.Thread
-import scala.concurrent.util.Duration
-
/**
* A context to be notified by `scala.concurrent.blocking` when
* a thread is about to block. In effect this trait provides
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 1be6050303..844ec14241 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -10,7 +10,6 @@ package scala.concurrent
import java.util.concurrent.{ ExecutorService, Executor }
-import scala.concurrent.util.Duration
import scala.annotation.implicitNotFound
import scala.util.Try
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index bc0b437a33..a5d9cdd5d1 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -16,7 +16,6 @@ import java.lang.{ Iterable => JIterable }
import java.util.{ LinkedList => JLinkedList }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
-import scala.concurrent.util.Duration
import scala.util.control.NonFatal
import scala.Option
import scala.util.{Try, Success, Failure}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 875a558887..c517a05a81 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -14,7 +14,6 @@ import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorS
import java.util.Collection
import scala.concurrent.forkjoin._
import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
-import scala.concurrent.util.Duration
import scala.util.control.NonFatal
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index b19bed004b..35aac974ec 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -65,29 +65,44 @@ private[concurrent] object Promise {
protected final def tryAwait(atMost: Duration): Boolean = {
@tailrec
def awaitUnsafe(waitTimeNanos: Long): Boolean = {
- if (value.isEmpty && waitTimeNanos > 0) {
+ if (!isCompleted && waitTimeNanos > 0) {
val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec
val start = System.nanoTime()
- try {
- synchronized {
- if (!isCompleted) wait(ms, ns) // previously - this was a `while`, ending up in an infinite loop
- }
- } catch {
- case e: InterruptedException =>
+ synchronized {
+ if (!isCompleted) wait(ms, ns) // previously - this was a `while`, ending up in an infinite loop
}
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else
isCompleted
}
- awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
+ @tailrec
+ def awaitUnbounded(): Boolean = {
+ if (isCompleted) true
+ else {
+ synchronized {
+ if (!isCompleted) wait()
+ }
+ awaitUnbounded()
+ }
+ }
+
+ if (atMost <= Duration.Zero)
+ isCompleted
+ else if (atMost eq Duration.Undefined)
+ throw new IllegalArgumentException("cannot wait for Undefined period")
+ else if (atMost == Duration.Inf)
+ awaitUnbounded()
+ else
+ awaitUnsafe(atMost.toNanos)
}
@throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (isCompleted || tryAwait(atMost)) this
- else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+ else throw new TimeoutException("Futures timed out after [" + atMost + "]")
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index a2ef42fac8..3681109653 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -74,8 +74,10 @@ package concurrent {
* @param awaitable the `Awaitable` on which `ready` is to be called
* @param atMost the maximum timeout for which to wait
* @return the result of `awaitable.ready` which is defined to be the awaitable itself.
+ * @throws InterruptedException if the wait call was interrupted
*/
@throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type =
blocking(awaitable.ready(atMost)(AwaitPermission))
@@ -87,6 +89,7 @@ package concurrent {
* @param awaitable the `Awaitable` on which `result` is to be called
* @param atMost the maximum timeout for which to wait
* @return the result of `awaitable.result`
+ * @throws InterruptedException if the wait call was interrupted
*/
@throws(classOf[Exception])
def result[T](awaitable: Awaitable[T], atMost: Duration): T =
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index ffb5608fd2..a60f3c8a63 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -11,6 +11,7 @@ import scala.concurrent.{
import scala.concurrent.{ future, promise, blocking }
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.Duration
+import scala.reflect.{ classTag, ClassTag }
trait TestBase {
@@ -19,6 +20,14 @@ trait TestBase {
body(() => sv put true)
sv.take(2000)
}
+
+ def intercept[T <: Exception : ClassTag](code: => Unit): Unit =
+ try {
+ code
+ assert(false, "did not throw " + classTag[T])
+ } catch {
+ case ex: Exception if classTag[T].runtimeClass isInstance ex =>
+ }
// def assert(cond: => Boolean) {
// try {
@@ -663,6 +672,29 @@ trait FutureProjections extends TestBase {
case nsee: NoSuchElementException => done()
}
}
+
+ def testAwaitPositiveDuration(): Unit = once { done =>
+ val p = Promise[Int]()
+ val f = p.future
+ future {
+ intercept[IllegalArgumentException] { Await.ready(f, Duration.Undefined) }
+ p.success(0)
+ Await.ready(f, Duration.Zero)
+ Await.ready(f, Duration(500, "ms"))
+ Await.ready(f, Duration.Inf)
+ done()
+ } onFailure { case x => throw x }
+ }
+
+ def testAwaitNegativeDuration(): Unit = once { done =>
+ val f = Promise().future
+ future {
+ intercept[TimeoutException] { Await.ready(f, Duration.Zero) }
+ intercept[TimeoutException] { Await.ready(f, Duration.MinusInf) }
+ intercept[TimeoutException] { Await.ready(f, Duration(-500, "ms")) }
+ done()
+ } onFailure { case x => throw x }
+ }
testFailedFailureOnComplete()
testFailedFailureOnSuccess()
@@ -670,6 +702,8 @@ trait FutureProjections extends TestBase {
testFailedSuccessOnFailure()
testFailedFailureAwait()
testFailedSuccessAwait()
+ testAwaitPositiveDuration()
+ testAwaitNegativeDuration()
}