diff options
Diffstat (limited to 'src/library/scala/concurrent/Future.scala')
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 492 |
1 files changed, 492 insertions, 0 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala new file mode 100644 index 0000000000..eb54b61db0 --- /dev/null +++ b/src/library/scala/concurrent/Future.scala @@ -0,0 +1,492 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } +import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } +import java.lang.{ Iterable => JIterable } +import java.util.{ LinkedList => JLinkedList } +import java.{ lang => jl } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } + +import scala.util.{ Timeout, Duration, Try, Success, Failure } +import scala.Option + +import scala.annotation.tailrec +import scala.collection.mutable.Stack +import scala.collection.mutable.Builder +import scala.collection.generic.CanBuildFrom + + + +/** The trait that represents futures. + * + * Asynchronous computations that yield futures are created with the `future` call: + * + * {{{ + * val s = "Hello" + * val f: Future[String] = future { + * s + " future!" + * } + * f onSuccess { + * case msg => println(msg) + * } + * }}} + * + * @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang + * + * @define multipleCallbacks + * Multiple callbacks may be registered; there is no guarantee that they will be + * executed in a particular order. + * + * @define caughtThrowables + * The future may contain a throwable object and this means that the future failed. + * Futures obtained through combinators have the same exception as the future they were obtained from. + * The following throwable objects are not contained in the future: + * - `Error` - errors are not contained within futures + * - `InterruptedException` - not contained within futures + * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures + * + * Instead, the future is completed with a ExecutionException with one of the exceptions above + * as the cause. + * If a future is failed with a `scala.runtime.NonLocalReturnControl`, + * it is completed with a value instead from that throwable instead instead. + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + * + * @define forComprehensionExamples + * Example: + * + * {{{ + * val f = future { 5 } + * val g = future { 3 } + * val h = for { + * x: Int <- f // returns Future(5) + * y: Int <- g // returns Future(5) + * } yield x + y + * }}} + * + * is translated to: + * + * {{{ + * f flatMap { (x: Int) => g map { (y: Int) => x + y } } + * }}} + */ +trait Future[+T] extends Awaitable[T] { +self => + + /* Callbacks */ + + /** When this future is completed successfully (i.e. with a value), + * apply the provided partial function to the value if the partial function + * is defined at that value. + * + * If the future has already been completed with a value, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + */ + def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete { + case Failure(t) => // do nothing + case Success(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ } + } + + /** When this future is completed with a failure (i.e. with a throwable), + * apply the provided callback to the throwable. + * + * $caughtThrowables + * + * If the future has already been completed with a failure, + * this will either be applied immediately or be scheduled asynchronously. + * + * Will not be called in case that the future is completed with a value. + * + * $multipleCallbacks + */ + def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete { + case Failure(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ } + case Success(v) => // do nothing + } + + /** When this future is completed, either through an exception, a timeout, or a value, + * apply the provided function. + * + * If the future has already been completed, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + */ + def onComplete[U](func: Try[T] => U): this.type + + + /* Miscellaneous */ + + /** Creates a new promise. + */ + def newPromise[S]: Promise[S] + + + /* Projections */ + + /** Returns a failed projection of this future. + * + * The failed projection is a future holding a value of type `Throwable`. + * + * It is completed with a value which is the throwable of the original future + * in case the original future is failed. + * + * It is failed with a `NoSuchElementException` if the original future is completed successfully. + * + * Blocking on this future returns a value if the original future is completed with an exception + * and throws a corresponding exception if the original future fails. + */ + def failed: Future[Throwable] = { + def noSuchElem(v: T) = + new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v) + + val p = newPromise[Throwable] + + onComplete { + case Failure(t) => p success t + case Success(v) => p failure noSuchElem(v) + } + + p.future + } + + + /* Monadic operations */ + + /** Asynchronously processes the value in the future once the value becomes available. + * + * Will not be called if the future fails. + */ + def foreach[U](f: T => U): Unit = onComplete { + case Success(r) => f(r) + case Failure(_) => // do nothing + } + + /** Creates a new future by applying a function to the successful result of + * this future. If this future is completed with an exception then the new + * future will also contain this exception. + * + * $forComprehensionExample + */ + def map[S](f: T => S): Future[S] = { + val p = newPromise[S] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try p success f(v) + catch { + case t => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future by applying a function to the successful result of + * this future, and returns the result of the function as the new future. + * If this future is completed with an exception then the new future will + * also contain this exception. + * + * $forComprehensionExample + */ + def flatMap[S](f: T => Future[S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try { + f(v) onComplete { + case Failure(t) => p failure t + case Success(v) => p success v + } + } catch { + case t: Throwable => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future by filtering the value of the current future with a predicate. + * + * If the current future contains a value which satisfies the predicate, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails or times out, the resulting future also fails or times out, respectively. + * + * Example: + * {{{ + * val f = future { 5 } + * val g = f filter { _ % 2 == 1 } + * val h = f filter { _ % 2 == 0 } + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException + * }}} + */ + def filter(pred: T => Boolean): Future[T] = { + val p = newPromise[T] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try { + if (pred(v)) p success v + else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) + } catch { + case t: Throwable => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. + * + * If the current future contains a value for which the partial function is defined, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails or times out, the resulting future also fails or times out, respectively. + * + * Example: + * {{{ + * val f = future { -5 } + * val g = f collect { + * case x if x < 0 => -x + * } + * val h = f collect { + * case x if x > 0 => x * 2 + * } + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException + * }}} + */ + def collect[S](pf: PartialFunction[T, S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try { + if (pf.isDefinedAt(v)) p success pf(v) + else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + } catch { + case t: Throwable => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future that will handle any matching throwable that this + * future might contain. If there is no match, or if this future contains + * a valid result then the new future will contain the same. + * + * Example: + * + * {{{ + * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0 + * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception + * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 + * }}} + */ + def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Failure(t) if pf isDefinedAt t => + try { p success pf(t) } + catch { case t: Throwable => p complete resolver(t) } + case otherwise => p complete otherwise + } + + p.future + } + + /** Creates a new future that will handle any matching throwable that this + * future might contain by assigning it a value of another future. + * + * If there is no match, or if this future contains + * a valid result then the new future will contain the same result. + * + * Example: + * + * {{{ + * val f = future { Int.MaxValue } + * future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue + * }}} + */ + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Failure(t) if pf isDefinedAt t => + try { + p completeWith pf(t) + } catch { + case t: Throwable => p complete resolver(t) + } + case otherwise => p complete otherwise + } + + p.future + } + + /** Zips the values of `this` and `that` future, and creates + * a new future holding the tuple of their results. + * + * If `this` future fails, the resulting future is failed + * with the throwable stored in `this`. + * Otherwise, if `that` future fails, the resulting future is failed + * with the throwable stored in `that`. + */ + def zip[U](that: Future[U]): Future[(T, U)] = { + val p = newPromise[(T, U)] + + this onComplete { + case Failure(t) => p failure t + case Success(r) => that onSuccess { + case r2 => p success ((r, r2)) + } + } + + that onFailure { + case f => p failure f + } + + p.future + } + + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, + * the result of the `that` future if `that` is completed successfully. + * If both futures are failed, the resulting future holds the throwable object of the first future. + * + * Using this method will not cause concurrent programs to become nondeterministic. + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f orElse g + * await(0) h // evaluates to 5 + * }}} + */ + def fallbackTo[U >: T](that: Future[U]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Failure(t) => that onComplete { + case Failure(_) => p failure t + case Success(v) => p success v + } + case Success(v) => p success v + } + + p.future + } + + /** Applies the side-effecting function to the result of this future, and returns + * a new future with the result of this future. + * + * This method allows one to enforce that the callbacks are executed in a + * specified order. + * + * Note that if one of the chained `andThen` callbacks throws + * an exception, that exception is not propagated to the subsequent `andThen` + * callbacks. Instead, the subsequent `andThen` callbacks are given the original + * value of this future. + * + * The following example prints out `5`: + * + * {{{ + * val f = future { 5 } + * f andThen { + * case r => sys.error("runtime exception") + * } andThen { + * case Failure(t) => println(t) + * case Success(v) => println(v) + * } + * }}} + */ + def andThen[U](pf: PartialFunction[Try[T], U]): Future[T] = { + val p = newPromise[T] + + onComplete { + case r => + try if (pf isDefinedAt r) pf(r) + finally p complete r + } + + p.future + } + + /** Creates a new future which holds the result of either this future or `that` future, depending on + * which future was completed first. + * + * $nonDeterministic + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f either g + * await(0) h // evaluates to either 5 or throws a runtime exception + * }}} + */ + def either[U >: T](that: Future[U]): Future[U] = { + val p = self.newPromise[U] + + val completePromise: PartialFunction[Try[U], _] = { + case Failure(t) => p tryFailure t + case Success(v) => p trySuccess v + } + + self onComplete completePromise + that onComplete completePromise + + p.future + } + +} + + + +/** TODO some docs + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + */ +object Future { + + // TODO make more modular by encoding all other helper methods within the execution context + /** TODO some docs + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] = + ec.all[T, Coll](futures) + + // move this to future companion object + @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) + + def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures) + + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate) + +} + + + + |