From d595324efe2be1c552bad8201aaef9ce383e5c95 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 12 Jan 2012 16:20:25 +0100 Subject: Refactor await calls for awaitable objects to ready and result calls. --- src/library/scala/concurrent/Future.scala | 123 ++++++++++--------------- src/library/scala/concurrent/akka/Future.scala | 16 ++++ src/library/scala/concurrent/package.scala | 15 ++- 3 files changed, 74 insertions(+), 80 deletions(-) create mode 100644 src/library/scala/concurrent/akka/Future.scala (limited to 'src') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index ada6736132..748d08be9f 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -1,29 +1,32 @@ - -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ package scala.concurrent -//import akka.AkkaException (replaced with Exception) -//import akka.event.Logging.Error (removed all logging) + + import scala.util.{ Timeout, Duration } import scala.Option -//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods) import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS } -import java.lang.{ Iterable ⇒ JIterable } -import java.util.{ LinkedList ⇒ JLinkedList } +import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } +import java.lang.{ Iterable => JIterable } +import java.util.{ LinkedList => JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack -//import akka.util.Switch (commented method) -import java.{ lang ⇒ jl } +import java.{ lang => jl } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom + + /** The trait that represents futures. * * @define multipleCallbacks @@ -95,20 +98,6 @@ self => case Right(v) => // do nothing } - /* To be removed - /** When this future times out, apply the provided function. - * - * If the future has already timed out, - * this will either be applied immediately or be scheduled asynchronously. - * - * $multipleCallbacks - */ - def onTimeout[U](callback: FutureTimeoutException => U): this.type = onComplete { - case Left(te: FutureTimeoutException) => callback(te) - case Right(v) => // do nothing - } - */ - /** When this future is completed, either through an exception, a timeout, or a value, * apply the provided function. * @@ -130,14 +119,6 @@ self => */ def newPromise[S]: Promise[S] = executionContext promise - /* - /** Tests whether this `Future`'s timeout has expired. - * - * $futureTimeout - */ - def isTimedout: Boolean - */ - /* Projections */ @@ -176,46 +157,6 @@ self => new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v) } - /* - /** A timed out projection of this future. - * - * The timed out projection is a future holding a value of type `FutureTimeoutException`. - * - * It is completed with a value which is a `FutureTimeoutException` of the original future - * in case the original future is timed out. - * - * It is failed with a `NoSuchElementException` if the original future is completed successfully. - * It is failed with the original exception otherwise. - * - * Blocking on this future returns a value only if the original future timed out, and a - * corresponding exception otherwise. - */ - def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] { - def executionContext = self.executionContext - def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = { - self.onComplete { - case Left(te: FutureTimeoutException) => func(Right(te)) - case Left(t) => func(Left(noSuchElemThrowable(t))) - case Right(v) => func(Left(noSuchElemValue(v))) - } - this - } - def isTimedout = self.isTimedout - def block()(implicit canblock: CanBlock) = try { - val res = self.block() - throw noSuchElemValue(res) - } catch { - case ft: FutureTimeoutException => - ft - case t: Throwable => - throw noSuchElemThrowable(t) - } - private def noSuchElemValue(v: T) = - new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v) - private def noSuchElemThrowable(v: Throwable) = - new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v) - } - */ /* Monadic operations */ @@ -299,7 +240,7 @@ self => * Example: * {{{ * val f = future { 5 } - * val g = g filter { _ % 2 == 1 } + * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } * block on g // evaluates to 5 * block on h // throw a NoSuchElementException @@ -316,6 +257,38 @@ self => p.future } + /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. + * + * + * If the current future contains a value for which the partial function is defined, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails or times out, the resulting future also fails or times out, respectively. + * + * Example: + * {{{ + * val f = future { -5 } + * val g = f collect { + * case x if x < 0 => -x + * } + * val h = f collect { + * case x if x > 0 => x * 2 + * } + * block on g // evaluates to 5 + * block on h // throw a NoSuchElementException + * }}} + */ + def collect[S](pf: PartialFunction[T, S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case Left(t) => p failure t + case Right(v) => if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + } + + p.future + } + } diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala new file mode 100644 index 0000000000..e359456736 --- /dev/null +++ b/src/library/scala/concurrent/akka/Future.scala @@ -0,0 +1,16 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +package akka + + + + + + diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 61137fbc6e..666e12456d 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -67,9 +67,9 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] { + def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] { def await(timeout: Timeout)(implicit cb: CanAwait) = body - }) + }, atMost) /** Blocks on a blockable object. * @@ -80,13 +80,18 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = { + def result[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { - case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(timeout, awaitable) // inside an execution context thread + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(atMost, awaitable) // inside an execution context thread } } + def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { + result(awaitable, atMost) + awaitable + } + } -- cgit v1.2.3