summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-30 16:33:05 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-30 16:33:05 +0100
commit8f36bf7a71f29af5ab61a3f58897881932c1daa3 (patch)
tree89f1c621275f8a91e54d644060ab3526c90e6382
parentb346ad485ff55091a370c93238628ea4fe1249b1 (diff)
downloadscala-8f36bf7a71f29af5ab61a3f58897881932c1daa3.tar.gz
scala-8f36bf7a71f29af5ab61a3f58897881932c1daa3.tar.bz2
scala-8f36bf7a71f29af5ab61a3f58897881932c1daa3.zip
Add some missing methods, remove obsolete methods in futures.
Remove `ensure`. Add `reportFailure` to execution contexts. Add `zip` to futures.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala2
-rw-r--r--src/library/scala/concurrent/Future.scala47
-rw-r--r--src/library/scala/concurrent/akka/ExecutionContextImpl.scala8
-rw-r--r--src/library/scala/concurrent/akka/Promise.scala2
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala2
-rw-r--r--src/library/scala/concurrent/package.scala16
6 files changed, 41 insertions, 36 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 5ad9265f59..40fafd130c 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -37,6 +37,8 @@ trait ExecutionContext {
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
+ def reportFailure(t: Throwable): Unit
+
/* implementations follow */
private implicit val executionContext = this
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 1b20c91e49..8d69586fbc 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -330,7 +330,7 @@ self =>
* future (6 / 0) rescue { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*/
- def rescue[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
+ def tryRecover[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
val p = newPromise[U]
onComplete {
@@ -349,6 +349,31 @@ self =>
p.future
}
+ /** Zips the values of `this` and `that` future, and creates
+ * a new future holding the tuple of their results.
+ *
+ * If `this` future fails, the resulting future is failed
+ * with the throwable stored in `this`.
+ * Otherwise, if `that` future fails, the resulting future is failed
+ * with the throwable stored in `that`.
+ */
+ def zip[U](that: Future[U]): Future[(T, U)] = {
+ val p = newPromise[(T, U)]
+
+ this onComplete {
+ case Left(t) => p failure t
+ case Right(r) => that onSuccess {
+ case r2 => p success ((r, r2))
+ }
+ }
+
+ that onFailure {
+ case f => p failure f
+ }
+
+ p.future
+ }
+
/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
* If both futures are failed, the resulting future holds the throwable object of the first future.
@@ -412,26 +437,6 @@ self =>
p.future
}
- /** Executes a piece of code once this future is completed, regardless of whether
- * or not the future fails or succeeds, and returns a new future with the result of this
- * future.
- *
- * This method allows one to enforce ordering.
- *
- * The below example always executes the `println` calls in order:
- * {{{
- * val f = future { 5 }
- * f ensure {
- * println("The value is available.")
- * } ensure {
- * println("The application can now end.")
- * }
- * }}}
- */
- def ensure[U](body: =>U): Future[T] = andThen {
- case _ => body
- }
-
/** Creates a new future which holds the result of either this future or `that` future, depending on
* which future was completed first.
*
diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
index 922d77189c..e2773e1fd5 100644
--- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
@@ -22,7 +22,7 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo
def execute(runnable: Runnable): Unit = executorService match {
// case fj: ForkJoinPool =>
- // // TODO fork if more applicable
+ // TODO fork if more applicable
// executorService execute runnable
case _ =>
executorService execute runnable
@@ -60,6 +60,10 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo
}
}
+ def reportFailure(t: Throwable) {
+ t.printStackTrace()
+ }
+
/** Only callable from the tasks running on the same execution context. */
private def blockingCall[T](body: Awaitable[T]): T = {
releaseStack()
@@ -104,7 +108,7 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo
} catch {
case e =>
// TODO catching all and continue isn't good for OOME
- e.printStackTrace()
+ reportFailure(e)
}
}
} finally {
diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala
index 923d5baf6d..fad78478a5 100644
--- a/src/library/scala/concurrent/akka/Promise.scala
+++ b/src/library/scala/concurrent/akka/Promise.scala
@@ -199,7 +199,7 @@ object Promise {
try {
func(result)
} catch {
- case e => e.printStackTrace()
+ case e => executor.reportFailure(e)
}
}
}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index 3e52d79894..7ac297de9e 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -290,6 +290,8 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
res
}
+ def reportFailure(t: Throwable): Unit = {}
+
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 6c1e323595..5c3f1f818d 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -121,11 +121,6 @@ package object concurrent {
object nondeterministic {
}
- final class DurationOps private[concurrent] (x: Int) {
- // TODO ADD OTHERS
- def ns = util.Duration.fromNanos(0)
- }
-
@inline implicit final def int2durationops(x: Int) = new DurationOps(x)
}
@@ -144,13 +139,10 @@ package concurrent {
def this(origin: Future[_]) = this(origin, "Future timed out.")
}
- /** Evidence that the program can be nondeterministic.
- *
- * Programs in which such an evidence is available in scope
- * can contain calls to methods which yield nondeterministic
- * programs.
- */
- sealed trait NonDeterministic
+ final class DurationOps private[concurrent] (x: Int) {
+ // TODO ADD OTHERS
+ def ns = util.Duration.fromNanos(0)
+ }
}