summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/Future.scala
blob: 9aaf05dbd6d76d4fc6f59607a2f18ca071db3bdc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.concurrent



import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS  MILLIS }
import java.lang.{ Iterable => JIterable }
import java.util.{ LinkedList => JLinkedList }
import java.{ lang => jl }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }

import scala.concurrent.util.Duration
import scala.concurrent.impl.NonFatal
import scala.Option

import scala.annotation.tailrec
import scala.collection.mutable.Stack
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import language.higherKinds



/** The trait that represents futures.
 *
 *  Asynchronous computations that yield futures are created with the `future` call:
 *
 *  {{{
 *  val s = "Hello"
 *  val f: Future[String] = future {
 *    s + " future!"
 *  }
 *  f onSuccess {
 *    case msg => println(msg)
 *  }
 *  }}}
 *
 *  @author  Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
 *
 *  @define multipleCallbacks
 *  Multiple callbacks may be registered; there is no guarantee that they will be
 *  executed in a particular order.
 *
 *  @define caughtThrowables
 *  The future may contain a throwable object and this means that the future failed.
 *  Futures obtained through combinators have the same exception as the future they were obtained from.
 *  The following throwable objects are not contained in the future:
 *  - `Error` - errors are not contained within futures
 *  - `InterruptedException` - not contained within futures
 *  - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
 *
 *  Instead, the future is completed with a ExecutionException with one of the exceptions above
 *  as the cause.
 *  If a future is failed with a `scala.runtime.NonLocalReturnControl`,
 *  it is completed with a value instead from that throwable instead instead.
 *
 *  @define nonDeterministic
 *  Note: using this method yields nondeterministic dataflow programs.
 *
 *  @define forComprehensionExamples
 *  Example:
 *
 *  {{{
 *  val f = future { 5 }
 *  val g = future { 3 }
 *  val h = for {
 *    x: Int <- f // returns Future(5)
 *    y: Int <- g // returns Future(5)
 *  } yield x + y
 *  }}}
 *
 *  is translated to:
 *
 *  {{{
 *  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
 *  }}}
 */
trait Future[+T] extends Awaitable[T] {

  /* Callbacks */

  /** When this future is completed successfully (i.e. with a value),
   *  apply the provided partial function to the value if the partial function
   *  is defined at that value.
   *
   *  If the future has already been completed with a value,
   *  this will either be applied immediately or be scheduled asynchronously.
   *
   *  $multipleCallbacks
   */
  def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
    case Left(t) => // do nothing
    case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
  }

  /** When this future is completed with a failure (i.e. with a throwable),
   *  apply the provided callback to the throwable.
   *
   *  $caughtThrowables
   *
   *  If the future has already been completed with a failure,
   *  this will either be applied immediately or be scheduled asynchronously.
   *
   *  Will not be called in case that the future is completed with a value.
   *
   *  $multipleCallbacks
   */
  def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
    case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
    case Right(v) => // do nothing
  }

  /** When this future is completed, either through an exception, or a value,
   *  apply the provided function.
   *
   *  If the future has already been completed,
   *  this will either be applied immediately or be scheduled asynchronously.
   *
   *  $multipleCallbacks
   */
  def onComplete[U](func: Either[Throwable, T] => U): this.type


  /* Miscellaneous */

  /** Creates a new promise.
   */
  protected def newPromise[S]: Promise[S]
  
  /** Returns whether the future has already been completed with
   *  a value or an exception.
   *  
   *  $nonDeterministic
   *  
   *  @return    `true` if the future is already completed, `false` otherwise
   */
  def isCompleted: Boolean
  
  /** The value of this `Future`.
   *  
   *  If the future is not completed the returned value will be `None`.
   *  If the future is completed the value will be `Some(Success(t))`
   *  if it contains a valid result, or `Some(Failure(error))` if it contains
   *  an exception.
   */
  def value: Option[Either[Throwable, T]]
  
  
  /* Projections */

  /** Returns a failed projection of this future.
   *
   *  The failed projection is a future holding a value of type `Throwable`.
   *
   *  It is completed with a value which is the throwable of the original future
   *  in case the original future is failed.
   *
   *  It is failed with a `NoSuchElementException` if the original future is completed successfully.
   *
   *  Blocking on this future returns a value if the original future is completed with an exception
   *  and throws a corresponding exception if the original future fails.
   */
  def failed: Future[Throwable] = {
    val p = newPromise[Throwable]

    onComplete {
      case Left(t) => p success t
      case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v))
    }

    p.future
  }


  /* Monadic operations */

  /** Asynchronously processes the value in the future once the value becomes available.
   *
   *  Will not be called if the future fails.
   */
  def foreach[U](f: T => U): Unit = onComplete {
    case Right(r) => f(r)
    case Left(_)  => // do nothing
  }

  /** Creates a new future by applying a function to the successful result of
   *  this future. If this future is completed with an exception then the new
   *  future will also contain this exception.
   *
   *  $forComprehensionExample
   */
  def map[S](f: T => S): Future[S] = {
    val p = newPromise[S]

    onComplete {
      case Left(t) => p failure t
      case Right(v) =>
        try p success f(v)
        catch {
          case NonFatal(t) => p complete resolver(t)
        }
    }

    p.future
  }

  /** Creates a new future by applying a function to the successful result of
   *  this future, and returns the result of the function as the new future.
   *  If this future is completed with an exception then the new future will
   *  also contain this exception.
   *
   *  $forComprehensionExample
   */
  def flatMap[S](f: T => Future[S]): Future[S] = {
    val p = newPromise[S]

    onComplete {
      case Left(t) => p failure t
      case Right(v) =>
        try {
          f(v) onComplete {
            case Left(t) => p failure t
            case Right(v) => p success v
          }
        } catch {
          case NonFatal(t) => p complete resolver(t)
        }
    }

    p.future
  }

  /** Creates a new future by filtering the value of the current future with a predicate.
   *
   *  If the current future contains a value which satisfies the predicate, the new future will also hold that value.
   *  Otherwise, the resulting future will fail with a `NoSuchElementException`.
   *
   *  If the current future fails, then the resulting future also fails.
   *
   *  Example:
   *  {{{
   *  val f = future { 5 }
   *  val g = f filter { _ % 2 == 1 }
   *  val h = f filter { _ % 2 == 0 }
   *  await(g, 0) // evaluates to 5
   *  await(h, 0) // throw a NoSuchElementException
   *  }}}
   */
  def filter(pred: T => Boolean): Future[T] = {
    val p = newPromise[T]

    onComplete {
      case Left(t) => p failure t
      case Right(v) =>
        try {
          if (pred(v)) p success v
          else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
        } catch {
          case NonFatal(t) => p complete resolver(t)
        }
    }

    p.future
  }

  /** Used by for-comprehensions.
   */
  final def withFilter(p: T => Boolean): Future[T] = filter(p)
  // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)

  // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) {
  //   def foreach(f: S => Unit): Unit = self filter p foreach f
  //   def map[R](f: S => R) = self filter p map f
  //   def flatMap[R](f: S => Future[R]) = self filter p flatMap f
  //   def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x))
  // }

  /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value.
   *
   *  If the current future contains a value for which the partial function is defined, the new future will also hold that value.
   *  Otherwise, the resulting future will fail with a `NoSuchElementException`.
   *
   *  If the current future fails, then the resulting future also fails.
   *
   *  Example:
   *  {{{
   *  val f = future { -5 }
   *  val g = f collect {
   *    case x if x < 0 => -x
   *  }
   *  val h = f collect {
   *    case x if x > 0 => x * 2
   *  }
   *  await(g, 0) // evaluates to 5
   *  await(h, 0) // throw a NoSuchElementException
   *  }}}
   */
  def collect[S](pf: PartialFunction[T, S]): Future[S] = {
    val p = newPromise[S]

    onComplete {
      case Left(t) => p failure t
      case Right(v) =>
        try {
          if (pf.isDefinedAt(v)) p success pf(v)
          else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
        } catch {
          case NonFatal(t) => p complete resolver(t)
        }
    }

    p.future
  }

  /** Creates a new future that will handle any matching throwable that this
   *  future might contain. If there is no match, or if this future contains
   *  a valid result then the new future will contain the same.
   *
   *  Example:
   *
   *  {{{
   *  future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
   *  future (6 / 0) recover { case e: NotFoundException   ⇒ 0 } // result: exception
   *  future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
   *  }}}
   */
  def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
    val p = newPromise[U]

    onComplete {
      case Left(t) if pf isDefinedAt t =>
        try { p success pf(t) }
        catch { case NonFatal(t) => p complete resolver(t) }
      case otherwise => p complete otherwise
    }

    p.future
  }

  /** Creates a new future that will handle any matching throwable that this
   *  future might contain by assigning it a value of another future.
   *
   *  If there is no match, or if this future contains
   *  a valid result then the new future will contain the same result.
   *
   *  Example:
   *
   *  {{{
   *  val f = future { Int.MaxValue }
   *  future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
   *  }}}
   */
  def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
    val p = newPromise[U]

    onComplete {
      case Left(t) if pf isDefinedAt t =>
        try {
          p completeWith pf(t)
        } catch {
          case NonFatal(t) => p complete resolver(t)
        }
      case otherwise => p complete otherwise
    }

    p.future
  }

  /** Zips the values of `this` and `that` future, and creates
   *  a new future holding the tuple of their results.
   *
   *  If `this` future fails, the resulting future is failed
   *  with the throwable stored in `this`.
   *  Otherwise, if `that` future fails, the resulting future is failed
   *  with the throwable stored in `that`.
   */
  def zip[U](that: Future[U]): Future[(T, U)] = {
    val p = newPromise[(T, U)]

    this onComplete {
      case Left(t)  => p failure t
      case Right(r) => that onSuccess {
        case r2 => p success ((r, r2))
      }
    }

    that onFailure {
      case f => p failure f
    }

    p.future
  }

  /** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
   *  the result of the `that` future if `that` is completed successfully.
   *  If both futures are failed, the resulting future holds the throwable object of the first future.
   *
   *  Using this method will not cause concurrent programs to become nondeterministic.
   *
   *  Example:
   *  {{{
   *  val f = future { sys.error("failed") }
   *  val g = future { 5 }
   *  val h = f fallbackTo g
   *  await(h, 0) // evaluates to 5
   *  }}}
   */
  def fallbackTo[U >: T](that: Future[U]): Future[U] = {
    val p = newPromise[U]
    onComplete {
      case r @ Right(_)  p complete r
      case _             p completeWith that
    }
    p.future
  }
  
  /** Creates a new `Future[S]` which is completed with this `Future`'s result if
   *  that conforms to `S`'s erased type or a `ClassCastException` otherwise.
   */
  def mapTo[S](implicit m: Manifest[S]): Future[S] = {
    import java.{ lang => jl }
    val toBoxed = Map[Class[_], Class[_]](
      classOf[Boolean] -> classOf[jl.Boolean],
      classOf[Byte]    -> classOf[jl.Byte],
      classOf[Char]    -> classOf[jl.Character],
      classOf[Short]   -> classOf[jl.Short],
      classOf[Int]     -> classOf[jl.Integer],
      classOf[Long]    -> classOf[jl.Long],
      classOf[Float]   -> classOf[jl.Float],
      classOf[Double]  -> classOf[jl.Double],
      classOf[Unit]    -> classOf[scala.runtime.BoxedUnit]
    )

    def boxedType(c: Class[_]): Class[_] = {
      if (c.isPrimitive) toBoxed(c) else c
    }

    val p = newPromise[S]
    
    onComplete {
      case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]]
      case Right(t) =>
        p complete (try {
          Right(boxedType(m.erasure).cast(t).asInstanceOf[S])
        } catch {
          case e: ClassCastException => Left(e)
        })
    }
    
    p.future
  }
  
  /** Applies the side-effecting function to the result of this future, and returns
   *  a new future with the result of this future.
   *
   *  This method allows one to enforce that the callbacks are executed in a
   *  specified order.
   *
   *  Note that if one of the chained `andThen` callbacks throws
   *  an exception, that exception is not propagated to the subsequent `andThen`
   *  callbacks. Instead, the subsequent `andThen` callbacks are given the original
   *  value of this future.
   *
   *  The following example prints out `5`:
   *
   *  {{{
   *  val f = future { 5 }
   *  f andThen {
   *    case r => sys.error("runtime exception")
   *  } andThen {
   *    case Failure(t) => println(t)
   *    case Success(v) => println(v)
   *  }
   *  }}}
   */
  def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
    val p = newPromise[T]

    onComplete {
      case r =>
        try if (pf isDefinedAt r) pf(r)
        finally p complete r
    }

    p.future
  }

  /** Creates a new future which holds the result of either this future or `that` future, depending on
   *  which future was completed first.
   *
   *  $nonDeterministic
   *
   *  Example:
   *  {{{
   *  val f = future { sys.error("failed") }
   *  val g = future { 5 }
   *  val h = f either g
   *  await(h, 0) // evaluates to either 5 or throws a runtime exception
   *  }}}
   */
  def either[U >: T](that: Future[U]): Future[U] = {
    val p = newPromise[U]

    val completePromise: PartialFunction[Either[Throwable, U], _] = {
      case Left(t) => p tryFailure t
      case Right(v) => p trySuccess v
    }

    this onComplete completePromise
    that onComplete completePromise

    p.future
  }

}



/** Future companion object.
 *
 *  @define nonDeterministic
 *  Note: using this method yields nondeterministic dataflow programs.
 */
object Future {
 /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
  *
  *  The result becomes available once the asynchronous computation is completed.
  *
  *  @tparam T       the type of the result
  *  @param body     the asychronous computation
  *  @param execctx  the execution context on which the future is run
  *  @return         the `Future` holding the result of the computation
  */
  def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)

  import scala.collection.mutable.Builder
  import scala.collection.generic.CanBuildFrom

  /** Simple version of `Futures.traverse`. Transforms a `Traversable[Future[A]]` into a `Future[Traversable[A]]`.
   *  Useful for reducing many `Future`s into a single `Future`.
   */
  def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Promise.successful(cbf(in)).future) {
      (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)
    } map (_.result)
  }

  /** Returns a `Future` to the result of the first future in the list that is completed.
   */
  def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
    val p = Promise[T]()

    val completeFirst: Either[Throwable, T] => Unit = p tryComplete _
    futures.foreach(_ onComplete completeFirst)

    p.future
  }

  /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate.
   */
  def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
    if (futures.isEmpty) Promise.successful[Option[T]](None).future
    else {
      val result = Promise[Option[T]]()
      val ref = new AtomicInteger(futures.size)
      val search: Either[Throwable, T] => Unit = v => try {
        v match {
          case Right(r) => if (predicate(r)) result tryComplete Right(Some(r))
          case _        =>
        }
      } finally {
        if (ref.decrementAndGet == 0)
          result tryComplete Right(None)
      }

      futures.foreach(_ onComplete search)

      result.future
    }
  }

  /** A non-blocking fold over the specified futures, with the start value of the given zero.
   *  The fold is performed on the thread where the last future is completed,
   *  the result will be the first failure of any of the futures, or any failure in the actual fold,
   *  or the result of the fold.
   *  
   *  Example:
   *  {{{
   *    val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
   *  }}}
   */
  def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
    if (futures.isEmpty) Promise.successful(zero).future
    else sequence(futures).map(_.foldLeft(zero)(foldFun))
  }

  /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first.
   *  
   *  Example:
   *  {{{
   *    val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
   *  }}}
   */
  def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
    if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future
    else sequence(futures).map(_ reduceLeft op)
  }
  
  /** Transforms a `Traversable[A]` into a `Future[Traversable[B]]` using the provided function `A => Future[B]`.
   *  This is useful for performing a parallel map. For example, to apply a function to all items of a list
   *  in parallel:
   *  
   *  {{{
   *    val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
   *  }}}
   */
  def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) =>
      val fb = fn(a.asInstanceOf[A])
      for (r <- fr; b <- fb) yield (r += b)
    }.map(_.result)
  
}