summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2011-12-13 13:31:06 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2011-12-13 13:31:06 +0100
commit65b27aa3ab58b39a3b68999f4c6b9d3af7a81f85 (patch)
tree57f57618a306f852349dfb662a2caf97c1c3732d
parentd9981460533949182691e2d83079af364d2dfcde (diff)
downloadscala-65b27aa3ab58b39a3b68999f4c6b9d3af7a81f85.tar.gz
scala-65b27aa3ab58b39a3b68999f4c6b9d3af7a81f85.tar.bz2
scala-65b27aa3ab58b39a3b68999f4c6b9d3af7a81f85.zip
Rename block->await. Add more tck test cases.
-rw-r--r--src/library/scala/concurrent/Awaitable.scala (renamed from src/library/scala/concurrent/Blockable.scala)7
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala2
-rw-r--r--src/library/scala/concurrent/Future.scala6
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala12
-rw-r--r--src/library/scala/concurrent/package.scala14
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala95
6 files changed, 108 insertions, 28 deletions
diff --git a/src/library/scala/concurrent/Blockable.scala b/src/library/scala/concurrent/Awaitable.scala
index d5c45a9e4e..85546718d2 100644
--- a/src/library/scala/concurrent/Blockable.scala
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -11,12 +11,13 @@ package scala.concurrent
import scala.annotation.implicitNotFound
+import scala.util.Timeout
-trait Blockable[+T] {
- @implicitNotFound(msg = "Blocking must be done by calling `block on b`, where `b` is the Blockable object.")
- def block()(implicit canblock: CanBlock): T
+trait Awaitable[+T] {
+ @implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.")
+ def await(timeout: Timeout)(implicit canblock: CanBlock): T
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 77d805b19c..b7b3e901e6 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -23,7 +23,7 @@ trait ExecutionContext {
def future[T](body: => T): Future[T]
/** Only callable from the tasks running on the same execution context. */
- def blockingCall[T](timeout: Timeout, body: Blockable[T]): T
+ def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index f653a8a47d..9937d43b23 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -59,7 +59,7 @@ import scala.collection.generic.CanBuildFrom
* f flatMap { (x: Int) => g map { (y: Int) => x + y } }
* }}}
*/
-trait Future[+T] extends Blockable[T] {
+trait Future[+T] extends Awaitable[T] {
self =>
/* Callbacks */
@@ -161,8 +161,8 @@ self =>
}
this
}
- def block()(implicit canblock: CanBlock) = try {
- val res = self.block()
+ def await(timeout: Timeout)(implicit canblock: CanBlock) = try {
+ val res = self.await(timeout)
throw noSuchElem(res)
} catch {
case t: Throwable => t
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index ea465225c8..dac6400b45 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -122,7 +122,7 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
}
}
- def block()(implicit canblock: scala.concurrent.CanBlock): T = getState match {
+ def await(timeout: Timeout)(implicit canblock: scala.concurrent.CanBlock): T = getState match {
case Success(res) => res
case Failure(t) => throw t
case _ =>
@@ -196,8 +196,8 @@ private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
def tryCancel(): Unit =
tryUnfork()
- def block()(implicit canblock: CanBlock): T = {
- join()
+ def await(timeout: Timeout)(implicit canblock: CanBlock): T = {
+ join() // TODO handle timeout also
(updater.get(this): @unchecked) match {
case Success(r) => r
case Failure(t) => throw t
@@ -263,16 +263,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
new PromiseImpl[T](this)
// TODO fix the timeout
- def blockingCall[T](timeout: Timeout, b: Blockable[T]): T = b match {
+ def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match {
case fj: TaskImpl[_] if fj.executionContext.pool eq pool =>
- fj.block()
+ fj.await(timeout)
case _ =>
var res: T = null.asInstanceOf[T]
@volatile var blockingDone = false
// TODO add exception handling here!
val mb = new ForkJoinPool.ManagedBlocker {
def block() = {
- res = b.block()(CanBlockEvidence)
+ res = b.await(timeout)(CanBlockEvidence)
blockingDone = true
true
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index d93d5b04ba..d9923d6d56 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -59,7 +59,7 @@ package object concurrent {
execCtx promise
/** Used to block on a piece of code which potentially blocks.
- *
+ *
* @param body A piece of code which contains potentially blocking or long running calls.
*
* Calling this method may throw the following exceptions:
@@ -67,23 +67,23 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def block[T](timeout: Timeout)(body: =>T): T = block(timeout, new Blockable[T] {
- def block()(implicit cb: CanBlock) = body
+ def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] {
+ def await(timeout: Timeout)(implicit cb: CanBlock) = body
})
/** Blocks on a blockable object.
*
- * @param blockable An object with a `block` method which runs potentially blocking or long running calls.
+ * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
*
* Calling this method may throw the following exceptions:
* - CancellationException - if the computation was cancelled
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def block[T](timeout: Timeout, blockable: Blockable[T]): T = {
+ def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = {
currentExecutionContext.get match {
- case null => blockable.block()(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(timeout, blockable) // inside an execution context thread
+ case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(timeout, awaitable) // inside an execution context thread
}
}
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 6e291c396b..774d4236b7 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -8,15 +8,14 @@ import scala.concurrent.{
SyncVar,
ExecutionException
}
+import scala.concurrent.future
+import scala.concurrent.promise
+import scala.concurrent.await
trait TestBase {
- def future[T](body: =>T): Future[T]
-
- def promise[T]: Promise[T]
-
def once(body: (() => Unit) => Unit) {
val sv = new SyncVar[Boolean]
body(() => sv put true)
@@ -206,6 +205,90 @@ trait FutureCombinators extends TestBase {
trait FutureProjections extends TestBase {
+ def testFailedFailureOnComplete(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onComplete {
+ case Right(t) =>
+ assert(t == cause)
+ done()
+ case Left(t) =>
+ assert(false)
+ }
+ }
+
+ def testFailedFailureOnSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onSuccess {
+ t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ def testFailedSuccessOnComplete(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onComplete {
+ case Right(t) =>
+ assert(false)
+ case Left(t) =>
+ assert(t.isInstanceOf[NoSuchElementException])
+ done()
+ }
+ }
+
+ def testFailedSuccessOnFailure(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onFailure {
+ case nsee: NoSuchElementException =>
+ done()
+ }
+ }
+
+ def testFailedFailureAwait(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ assert(await(0, f.failed) == cause)
+ done()
+ }
+
+ def testFailedSuccessAwait(): Unit = once {
+ done =>
+ val f = future { 0 }
+ try {
+ println(await(0, f.failed))
+ assert(false)
+ } catch {
+ case nsee: NoSuchElementException => done()
+ }
+ }
+
+ testFailedFailureOnComplete()
+ testFailedFailureOnSuccess()
+ testFailedSuccessOnComplete()
+ testFailedSuccessOnFailure()
+ testFailedFailureAwait()
+ //testFailedSuccessAwait()
+
+}
+
+
+trait Blocking extends TestBase {
+
+ // TODO
+
}
@@ -246,10 +329,6 @@ with Promises
with Exceptions
{
- def future[T](body: =>T) = scala.concurrent.future(body)
-
- def promise[T] = scala.concurrent.promise[T]
-
}