summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-12 16:20:25 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-12 16:20:25 +0100
commitd595324efe2be1c552bad8201aaef9ce383e5c95 (patch)
tree4a9eb0d11164791f1bb002ac3b6b8f64993ef875
parent7a6d66399eab9e29b0e4270b1c7e47c20471c91d (diff)
downloadscala-d595324efe2be1c552bad8201aaef9ce383e5c95.tar.gz
scala-d595324efe2be1c552bad8201aaef9ce383e5c95.tar.bz2
scala-d595324efe2be1c552bad8201aaef9ce383e5c95.zip
Refactor await calls for awaitable objects to ready and result calls.
-rw-r--r--src/library/scala/concurrent/Future.scala123
-rw-r--r--src/library/scala/concurrent/akka/Future.scala16
-rw-r--r--src/library/scala/concurrent/package.scala15
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala86
4 files changed, 123 insertions, 117 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index ada6736132..748d08be9f 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -1,29 +1,32 @@
-
-/**
- * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
- */
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
package scala.concurrent
-//import akka.AkkaException (replaced with Exception)
-//import akka.event.Logging.Error (removed all logging)
+
+
import scala.util.{ Timeout, Duration }
import scala.Option
-//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods)
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
-import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS }
-import java.lang.{ Iterable ⇒ JIterable }
-import java.util.{ LinkedList ⇒ JLinkedList }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable => JIterable }
+import java.util.{ LinkedList => JLinkedList }
import scala.annotation.tailrec
import scala.collection.mutable.Stack
-//import akka.util.Switch (commented method)
-import java.{ lang ⇒ jl }
+import java.{ lang => jl }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
+
+
/** The trait that represents futures.
*
* @define multipleCallbacks
@@ -95,20 +98,6 @@ self =>
case Right(v) => // do nothing
}
- /* To be removed
- /** When this future times out, apply the provided function.
- *
- * If the future has already timed out,
- * this will either be applied immediately or be scheduled asynchronously.
- *
- * $multipleCallbacks
- */
- def onTimeout[U](callback: FutureTimeoutException => U): this.type = onComplete {
- case Left(te: FutureTimeoutException) => callback(te)
- case Right(v) => // do nothing
- }
- */
-
/** When this future is completed, either through an exception, a timeout, or a value,
* apply the provided function.
*
@@ -130,14 +119,6 @@ self =>
*/
def newPromise[S]: Promise[S] = executionContext promise
- /*
- /** Tests whether this `Future`'s timeout has expired.
- *
- * $futureTimeout
- */
- def isTimedout: Boolean
- */
-
/* Projections */
@@ -176,46 +157,6 @@ self =>
new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
}
- /*
- /** A timed out projection of this future.
- *
- * The timed out projection is a future holding a value of type `FutureTimeoutException`.
- *
- * It is completed with a value which is a `FutureTimeoutException` of the original future
- * in case the original future is timed out.
- *
- * It is failed with a `NoSuchElementException` if the original future is completed successfully.
- * It is failed with the original exception otherwise.
- *
- * Blocking on this future returns a value only if the original future timed out, and a
- * corresponding exception otherwise.
- */
- def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] {
- def executionContext = self.executionContext
- def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = {
- self.onComplete {
- case Left(te: FutureTimeoutException) => func(Right(te))
- case Left(t) => func(Left(noSuchElemThrowable(t)))
- case Right(v) => func(Left(noSuchElemValue(v)))
- }
- this
- }
- def isTimedout = self.isTimedout
- def block()(implicit canblock: CanBlock) = try {
- val res = self.block()
- throw noSuchElemValue(res)
- } catch {
- case ft: FutureTimeoutException =>
- ft
- case t: Throwable =>
- throw noSuchElemThrowable(t)
- }
- private def noSuchElemValue(v: T) =
- new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v)
- private def noSuchElemThrowable(v: Throwable) =
- new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v)
- }
- */
/* Monadic operations */
@@ -299,7 +240,7 @@ self =>
* Example:
* {{{
* val f = future { 5 }
- * val g = g filter { _ % 2 == 1 }
+ * val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
* block on g // evaluates to 5
* block on h // throw a NoSuchElementException
@@ -316,6 +257,38 @@ self =>
p.future
}
+ /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value.
+ *
+ *
+ * If the current future contains a value for which the partial function is defined, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { -5 }
+ * val g = f collect {
+ * case x if x < 0 => -x
+ * }
+ * val h = f collect {
+ * case x if x > 0 => x * 2
+ * }
+ * block on g // evaluates to 5
+ * block on h // throw a NoSuchElementException
+ * }}}
+ */
+ def collect[S](pf: PartialFunction[T, S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p failure t
+ case Right(v) => if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
+ }
+
+ p.future
+ }
+
}
diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala
new file mode 100644
index 0000000000..e359456736
--- /dev/null
+++ b/src/library/scala/concurrent/akka/Future.scala
@@ -0,0 +1,16 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+package akka
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 61137fbc6e..666e12456d 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -67,9 +67,9 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] {
+ def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] {
def await(timeout: Timeout)(implicit cb: CanAwait) = body
- })
+ }, atMost)
/** Blocks on a blockable object.
*
@@ -80,13 +80,18 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = {
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T = {
currentExecutionContext.get match {
- case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(timeout, awaitable) // inside an execution context thread
+ case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(atMost, awaitable) // inside an execution context thread
}
}
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = {
+ result(awaitable, atMost)
+ awaitable
+ }
+
}
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index ccf1162e19..d62561c92d 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -11,6 +11,8 @@ import scala.concurrent.{
import scala.concurrent.future
import scala.concurrent.promise
import scala.concurrent.await
+import scala.concurrent.result
+import scala.concurrent.ready
import scala.util.Duration
@@ -42,9 +44,10 @@ trait FutureCallbacks extends TestBase {
val f = future {
x = 1
}
- f onSuccess { _ =>
- done()
- assert(x == 1)
+ f onSuccess {
+ case _ =>
+ done()
+ assert(x == 1)
}
}
@@ -54,12 +57,14 @@ trait FutureCallbacks extends TestBase {
val f = future {
x = 1
}
- f onSuccess { _ =>
+ f onSuccess {
+ case _ =>
assert(x == 1)
x = 2
- f onSuccess { _ =>
- assert(x == 2)
- done()
+ f onSuccess {
+ case _ =>
+ assert(x == 2)
+ done()
}
}
}
@@ -70,8 +75,8 @@ trait FutureCallbacks extends TestBase {
done()
throw new Exception
}
- f onSuccess { _ =>
- assert(false)
+ f onSuccess {
+ case _ => assert(false)
}
}
@@ -82,9 +87,10 @@ trait FutureCallbacks extends TestBase {
x = 1
throw new Exception
}
- f onSuccess { _ =>
- done()
- assert(false)
+ f onSuccess {
+ case _ =>
+ done()
+ assert(false)
}
f onFailure {
case _ =>
@@ -98,9 +104,10 @@ trait FutureCallbacks extends TestBase {
val f = future[Unit] {
throw cause
}
- f onSuccess { _ =>
- done()
- assert(false)
+ f onSuccess {
+ case _ =>
+ done()
+ assert(false)
}
f onFailure {
case e: ExecutionException if (e.getCause == cause) =>
@@ -116,9 +123,10 @@ trait FutureCallbacks extends TestBase {
val f = future[Unit] {
throw new TimeoutException()
}
- f onSuccess { _ =>
- done()
- assert(false)
+ f onSuccess {
+ case _ =>
+ done()
+ assert(false)
}
f onFailure {
case e: TimeoutException =>
@@ -195,9 +203,10 @@ trait FutureCombinators extends TestBase {
} recover {
case re: RuntimeException =>
"recovered"
- } onSuccess { x =>
- done()
- assert(x == "recovered")
+ } onSuccess {
+ case x =>
+ done()
+ assert(x == "recovered")
} onFailure { case any =>
done()
assert(false)
@@ -211,9 +220,10 @@ trait FutureCombinators extends TestBase {
throw cause
} recover {
case te: TimeoutException => "timeout"
- } onSuccess { x =>
- done()
- assert(false)
+ } onSuccess {
+ case x =>
+ done()
+ assert(false)
} onFailure { case any =>
done()
assert(any == cause)
@@ -258,9 +268,9 @@ trait FutureProjections extends TestBase {
throw cause
}
f.failed onSuccess {
- t =>
- assert(t == cause)
- done()
+ case t =>
+ assert(t == cause)
+ done()
}
}
@@ -291,7 +301,7 @@ trait FutureProjections extends TestBase {
val f = future {
throw cause
}
- assert(await(0, f.failed) == cause)
+ assert(result(f.failed, Duration(500, "ms")) == cause)
done()
}
@@ -299,7 +309,7 @@ trait FutureProjections extends TestBase {
done =>
val f = future { 0 }
try {
- await(0, f.failed)
+ ready(f.failed, Duration(0, "ms"))
assert(false)
} catch {
case nsee: NoSuchElementException => done()
@@ -321,7 +331,7 @@ trait Blocking extends TestBase {
def testAwaitSuccess(): Unit = once {
done =>
val f = future { 0 }
- await(Duration(500, "ms"), f)
+ ready(f, Duration(500, "ms"))
done()
}
@@ -332,7 +342,7 @@ trait Blocking extends TestBase {
throw cause
}
try {
- await(Duration(500, "ms"), f)
+ ready(f, Duration(500, "ms"))
assert(false)
} catch {
case t =>
@@ -354,12 +364,14 @@ trait Promises extends TestBase {
val p = promise[Int]()
val f = p.future
- f.onSuccess { x =>
- done()
- assert(x == 5)
- } onFailure { case any =>
- done()
- assert(false)
+ f.onSuccess {
+ case x =>
+ done()
+ assert(x == 5)
+ } onFailure {
+ case any =>
+ done()
+ assert(false)
}
p.success(5)