From 8f36bf7a71f29af5ab61a3f58897881932c1daa3 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Mon, 30 Jan 2012 16:33:05 +0100 Subject: Add some missing methods, remove obsolete methods in futures. Remove `ensure`. Add `reportFailure` to execution contexts. Add `zip` to futures. --- .../scala/concurrent/ExecutionContext.scala | 2 + src/library/scala/concurrent/Future.scala | 47 ++++++++++++---------- .../concurrent/akka/ExecutionContextImpl.scala | 8 +++- src/library/scala/concurrent/akka/Promise.scala | 2 +- .../scala/concurrent/default/TaskImpl.scala | 2 + src/library/scala/concurrent/package.scala | 16 ++------ 6 files changed, 41 insertions(+), 36 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 5ad9265f59..40fafd130c 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -37,6 +37,8 @@ trait ExecutionContext { def blocking[T](awaitable: Awaitable[T], atMost: Duration): T + def reportFailure(t: Throwable): Unit + /* implementations follow */ private implicit val executionContext = this diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 1b20c91e49..8d69586fbc 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -330,7 +330,7 @@ self => * future (6 / 0) rescue { case e: ArithmeticException => f } // result: Int.MaxValue * }}} */ - def rescue[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { + def tryRecover[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { val p = newPromise[U] onComplete { @@ -349,6 +349,31 @@ self => p.future } + /** Zips the values of `this` and `that` future, and creates + * a new future holding the tuple of their results. + * + * If `this` future fails, the resulting future is failed + * with the throwable stored in `this`. + * Otherwise, if `that` future fails, the resulting future is failed + * with the throwable stored in `that`. + */ + def zip[U](that: Future[U]): Future[(T, U)] = { + val p = newPromise[(T, U)] + + this onComplete { + case Left(t) => p failure t + case Right(r) => that onSuccess { + case r2 => p success ((r, r2)) + } + } + + that onFailure { + case f => p failure f + } + + p.future + } + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully. * If both futures are failed, the resulting future holds the throwable object of the first future. @@ -412,26 +437,6 @@ self => p.future } - /** Executes a piece of code once this future is completed, regardless of whether - * or not the future fails or succeeds, and returns a new future with the result of this - * future. - * - * This method allows one to enforce ordering. - * - * The below example always executes the `println` calls in order: - * {{{ - * val f = future { 5 } - * f ensure { - * println("The value is available.") - * } ensure { - * println("The application can now end.") - * } - * }}} - */ - def ensure[U](body: =>U): Future[T] = andThen { - case _ => body - } - /** Creates a new future which holds the result of either this future or `that` future, depending on * which future was completed first. * diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala index 922d77189c..e2773e1fd5 100644 --- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala @@ -22,7 +22,7 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo def execute(runnable: Runnable): Unit = executorService match { // case fj: ForkJoinPool => - // // TODO fork if more applicable + // TODO fork if more applicable // executorService execute runnable case _ => executorService execute runnable @@ -60,6 +60,10 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo } } + def reportFailure(t: Throwable) { + t.printStackTrace() + } + /** Only callable from the tasks running on the same execution context. */ private def blockingCall[T](body: Awaitable[T]): T = { releaseStack() @@ -104,7 +108,7 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo } catch { case e => // TODO catching all and continue isn't good for OOME - e.printStackTrace() + reportFailure(e) } } } finally { diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index 923d5baf6d..fad78478a5 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -199,7 +199,7 @@ object Promise { try { func(result) } catch { - case e => e.printStackTrace() + case e => executor.reportFailure(e) } } } diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index 3e52d79894..7ac297de9e 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -290,6 +290,8 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { res } + def reportFailure(t: Throwable): Unit = {} + } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 6c1e323595..5c3f1f818d 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -121,11 +121,6 @@ package object concurrent { object nondeterministic { } - final class DurationOps private[concurrent] (x: Int) { - // TODO ADD OTHERS - def ns = util.Duration.fromNanos(0) - } - @inline implicit final def int2durationops(x: Int) = new DurationOps(x) } @@ -144,13 +139,10 @@ package concurrent { def this(origin: Future[_]) = this(origin, "Future timed out.") } - /** Evidence that the program can be nondeterministic. - * - * Programs in which such an evidence is available in scope - * can contain calls to methods which yield nondeterministic - * programs. - */ - sealed trait NonDeterministic + final class DurationOps private[concurrent] (x: Int) { + // TODO ADD OTHERS + def ns = util.Duration.fromNanos(0) + } } -- cgit v1.2.3