summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-12 16:20:25 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-12 16:20:25 +0100
commitd595324efe2be1c552bad8201aaef9ce383e5c95 (patch)
tree4a9eb0d11164791f1bb002ac3b6b8f64993ef875 /src/library
parent7a6d66399eab9e29b0e4270b1c7e47c20471c91d (diff)
downloadscala-d595324efe2be1c552bad8201aaef9ce383e5c95.tar.gz
scala-d595324efe2be1c552bad8201aaef9ce383e5c95.tar.bz2
scala-d595324efe2be1c552bad8201aaef9ce383e5c95.zip
Refactor await calls for awaitable objects to ready and result calls.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/Future.scala123
-rw-r--r--src/library/scala/concurrent/akka/Future.scala16
-rw-r--r--src/library/scala/concurrent/package.scala15
3 files changed, 74 insertions, 80 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index ada6736132..748d08be9f 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -1,29 +1,32 @@
-
-/**
- * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
- */
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
package scala.concurrent
-//import akka.AkkaException (replaced with Exception)
-//import akka.event.Logging.Error (removed all logging)
+
+
import scala.util.{ Timeout, Duration }
import scala.Option
-//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods)
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
-import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS }
-import java.lang.{ Iterable ⇒ JIterable }
-import java.util.{ LinkedList ⇒ JLinkedList }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable => JIterable }
+import java.util.{ LinkedList => JLinkedList }
import scala.annotation.tailrec
import scala.collection.mutable.Stack
-//import akka.util.Switch (commented method)
-import java.{ lang ⇒ jl }
+import java.{ lang => jl }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
+
+
/** The trait that represents futures.
*
* @define multipleCallbacks
@@ -95,20 +98,6 @@ self =>
case Right(v) => // do nothing
}
- /* To be removed
- /** When this future times out, apply the provided function.
- *
- * If the future has already timed out,
- * this will either be applied immediately or be scheduled asynchronously.
- *
- * $multipleCallbacks
- */
- def onTimeout[U](callback: FutureTimeoutException => U): this.type = onComplete {
- case Left(te: FutureTimeoutException) => callback(te)
- case Right(v) => // do nothing
- }
- */
-
/** When this future is completed, either through an exception, a timeout, or a value,
* apply the provided function.
*
@@ -130,14 +119,6 @@ self =>
*/
def newPromise[S]: Promise[S] = executionContext promise
- /*
- /** Tests whether this `Future`'s timeout has expired.
- *
- * $futureTimeout
- */
- def isTimedout: Boolean
- */
-
/* Projections */
@@ -176,46 +157,6 @@ self =>
new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
}
- /*
- /** A timed out projection of this future.
- *
- * The timed out projection is a future holding a value of type `FutureTimeoutException`.
- *
- * It is completed with a value which is a `FutureTimeoutException` of the original future
- * in case the original future is timed out.
- *
- * It is failed with a `NoSuchElementException` if the original future is completed successfully.
- * It is failed with the original exception otherwise.
- *
- * Blocking on this future returns a value only if the original future timed out, and a
- * corresponding exception otherwise.
- */
- def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] {
- def executionContext = self.executionContext
- def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = {
- self.onComplete {
- case Left(te: FutureTimeoutException) => func(Right(te))
- case Left(t) => func(Left(noSuchElemThrowable(t)))
- case Right(v) => func(Left(noSuchElemValue(v)))
- }
- this
- }
- def isTimedout = self.isTimedout
- def block()(implicit canblock: CanBlock) = try {
- val res = self.block()
- throw noSuchElemValue(res)
- } catch {
- case ft: FutureTimeoutException =>
- ft
- case t: Throwable =>
- throw noSuchElemThrowable(t)
- }
- private def noSuchElemValue(v: T) =
- new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v)
- private def noSuchElemThrowable(v: Throwable) =
- new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v)
- }
- */
/* Monadic operations */
@@ -299,7 +240,7 @@ self =>
* Example:
* {{{
* val f = future { 5 }
- * val g = g filter { _ % 2 == 1 }
+ * val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
* block on g // evaluates to 5
* block on h // throw a NoSuchElementException
@@ -316,6 +257,38 @@ self =>
p.future
}
+ /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value.
+ *
+ *
+ * If the current future contains a value for which the partial function is defined, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { -5 }
+ * val g = f collect {
+ * case x if x < 0 => -x
+ * }
+ * val h = f collect {
+ * case x if x > 0 => x * 2
+ * }
+ * block on g // evaluates to 5
+ * block on h // throw a NoSuchElementException
+ * }}}
+ */
+ def collect[S](pf: PartialFunction[T, S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p failure t
+ case Right(v) => if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
+ }
+
+ p.future
+ }
+
}
diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala
new file mode 100644
index 0000000000..e359456736
--- /dev/null
+++ b/src/library/scala/concurrent/akka/Future.scala
@@ -0,0 +1,16 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+package akka
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 61137fbc6e..666e12456d 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -67,9 +67,9 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] {
+ def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] {
def await(timeout: Timeout)(implicit cb: CanAwait) = body
- })
+ }, atMost)
/** Blocks on a blockable object.
*
@@ -80,13 +80,18 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = {
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T = {
currentExecutionContext.get match {
- case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(timeout, awaitable) // inside an execution context thread
+ case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(atMost, awaitable) // inside an execution context thread
}
}
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = {
+ result(awaitable, atMost)
+ awaitable
+ }
+
}