summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/Future.scala
blob: 0aa6731353de5dcdfa7fd21c96c7969f4933e39c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
/**
 *  Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
 */

package scala.concurrent

//import akka.AkkaException (replaced with Exception)
//import akka.event.Logging.Error (removed all logging)
import scala.util.{ Timeout, Duration }
import scala.Option
//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods)

import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS  NANOS, MILLISECONDS  MILLIS }
import java.lang.{ Iterable  JIterable }
import java.util.{ LinkedList  JLinkedList }

import scala.annotation.tailrec
import scala.collection.mutable.Stack
//import akka.util.Switch (commented method)
import java.{ lang  jl }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom

/** The trait that represents futures.
 *  
 *  @define futureTimeout
 *  The timeout of the future is:
 *  - if this future was obtained from a task (i.e. by calling `task.future`), the timeout associated with that task
 *  - if this future was obtained from a promise (i.e. by calling `promise.future`), the timeout associated with that promise
 *
 *  @define multipleCallbacks
 *  Multiple callbacks may be registered; there is no guarantee that they will be
 *  executed in a particular order.
 *
 *  @define caughtThrowables
 *  The future may contain a throwable object and this means that the future failed.
 *  Futures obtained through combinators have the same exception as the future they were obtained from.
 *  The following throwable objects are not contained in the future:
 *  - Error - errors are not contained within futures
 *  - scala.util.control.ControlThrowable - not contained within futures
 *  - InterruptedException - not contained within futures
 *  
 *  Instead, the future is completed with a ExecutionException with one of the exceptions above
 *  as the cause.
 *
 *  @define forComprehensionExamples
 *  Example:
 *  
 *  {{{
 *  val f = future { 5 }
 *  val g = future { 3 }
 *  val h = for {
 *    x: Int <- f // returns Future(5)
 *    y: Int <- g // returns Future(5)
 *  } yield x + y
 *  }}}
 *  
 *  is translated to:
 *  
 *  {{{
 *  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
 *  }}}
 */
trait Future[+T] extends Blockable[T] {
self =>
  
  /* Callbacks */
  
  /** When this future is completed successfully (i.e. with a value),
   *  apply the provided function to the value.
   *  
   *  If the future has already been completed with a value,
   *  this will either be applied immediately or be scheduled asynchronously.
   *  
   *  Will not be called in case of an exception (this includes the FutureTimeoutException).
   *  
   *  $multipleCallbacks
   */
  def onSuccess[U](func: T => U): this.type = onComplete {
    case Left(t) => // do nothing
    case Right(v) => func(v)
  }
  
  /** When this future is completed with a failure (i.e. with a throwable),
   *  apply the provided callback to the throwable.
   *  
   *  $caughtThrowables
   *  
   *  If the future has already been completed with a failure,
   *  this will either be applied immediately or be scheduled asynchronously.
   *  
   *  Will not be called in case that the future is completed with a value.
   *  
   *  Will be called if the future is completed with a FutureTimeoutException.
   *  
   *  $multipleCallbacks
   */
  def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
    case Left(t) if t.isInstanceOf[FutureTimeoutException] || isFutureThrowable(t) => if (callback.isDefinedAt(t)) callback(t)
    case Right(v) => // do nothing
  }
  
  /** When this future times out, apply the provided function.
   *  
   *  If the future has already timed out,
   *  this will either be applied immediately or be scheduled asynchronously.
   *  
   *  $multipleCallbacks
   */
  def onTimeout[U](callback: FutureTimeoutException => U): this.type = onComplete {
    case Left(te: FutureTimeoutException) => callback(te)
    case Right(v) => // do nothing
  }
  
  /** When this future is completed, either through an exception, a timeout, or a value,
   *  apply the provided function.
   *  
   *  If the future has already been completed,
   *  this will either be applied immediately or be scheduled asynchronously.
   *  
   *  $multipleCallbacks
   */
  def onComplete[U](func: Either[Throwable, T] => U): this.type
  
  
  /* Miscellaneous */
  
  /** The execution context of the future.
   */
  def executionContext: ExecutionContext
  
  /** Creates a new promise.
   */
  def newPromise[S]: Promise[S] = executionContext promise
  
  /** Tests whether this `Future`'s timeout has expired.
   *
   *  $futureTimeout
   *  
   *  Note that an expired Future may still contain a value, or it may be
   *  completed with a value.
   */
  def isTimedout: Boolean
  
  
  /* Projections */
  
  /** Returns a failed projection of this future.
   *  
   *  The failed projection is a future holding a value of type `Throwable`.
   *  
   *  It is completed with a value which is the throwable of the original future
   *  in case the original future is failed.
   *  
   *  It is failed with a `NoSuchElementException` if the original future is completed successfully.
   *  
   *  Blocking on this future returns a value if the original future is completed with an exception
   *  and throws a corresponding exception if the original future fails.
   */
  def failed: Future[Throwable] = new Future[Throwable] {
    def executionContext = self.executionContext
    def onComplete[U](func: Either[Throwable, Throwable] => U) = {
      self.onComplete {
        case Left(t) => func(Right(t))
        case Right(v) => func(Left(noSuchElem(v))) // do nothing
      }
      this
    }
    def isTimedout = self.isTimedout
    def block()(implicit canblock: CanBlock) = try {
      val res = self.block()
      throw noSuchElem(res)
    } catch {
      case t: Throwable => t
    }
    private def noSuchElem(v: T) = 
      new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
  }
  
  /** A timed out projection of this future.
   *  
   *  The timed out projection is a future holding a value of type `FutureTimeoutException`.
   *  
   *  It is completed with a value which is a `FutureTimeoutException` of the original future
   *  in case the original future is timed out.
   *  
   *  It is failed with a `NoSuchElementException` if the original future is completed successfully.
   *  It is failed with the original exception otherwise.
   *  
   *  Blocking on this future returns a value only if the original future timed out, and a
   *  corresponding exception otherwise.
   */
  def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] {
    def executionContext = self.executionContext
    def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = {
      self.onComplete {
        case Left(te: FutureTimeoutException) => func(Right(te))
        case Left(t) => func(Left(noSuchElemThrowable(t)))
        case Right(v) => func(Left(noSuchElemValue(v)))
      }
      this
    }
    def isTimedout = self.isTimedout
    def block()(implicit canblock: CanBlock) = try {
      val res = self.block()
      throw noSuchElemValue(res)
    } catch {
      case ft: FutureTimeoutException =>
        ft
      case t: Throwable =>
        throw noSuchElemThrowable(t)
    }
    private def noSuchElemValue(v: T) =
      new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v)
    private def noSuchElemThrowable(v: Throwable) =
      new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v)
  }
  
  
  /* Monadic operations */
  
  /** Creates a new future that will handle any matching throwable that this
   *  future might contain. If there is no match, or if this future contains
   *  a valid result then the new future will contain the same.
   *  
   *  Example:
   *  
   *  {{{
   *  future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
   *  future (6 / 0) recover { case e: NotFoundException   ⇒ 0 } // result: exception
   *  future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
   *  }}}
   */
  def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
    val p = newPromise[U]
    
    onComplete {
      case Left(t) => if (pf isDefinedAt t) p fulfill pf(t) else p break t
      case Right(v) => p fulfill v
    }
    
    p.future
  }
  
  /** Asynchronously processes the value in the future once the value becomes available.
   *  
   *  Will not be called if the future times out or fails.
   *  
   *  This method typically registers an `onSuccess` callback.
   */
  def foreach[U](f: T => U): Unit = onSuccess(f)
  
  /** Creates a new future by applying a function to the successful result of
   *  this future. If this future is completed with an exception then the new
   *  future will also contain this exception.
   *  
   *  $forComprehensionExample
   */
  def map[S](f: T => S): Future[S] = {
    val p = newPromise[S]
    
    onComplete {
      case Left(t) => p break t
      case Right(v) => p fulfill f(v)
    }
    
    p.future
  }
  
  /** Creates a new future by applying a function to the successful result of
   *  this future, and returns the result of the function as the new future.
   *  If this future is completed with an exception then the new future will
   *  also contain this exception.
   *  
   *  $forComprehensionExample
   */
  def flatMap[S](f: T => Future[S]): Future[S] = {
    val p = newPromise[S]
    
    onComplete {
      case Left(t) => p break t
      case Right(v) => f(v) onComplete {
        case Left(t) => p break t
        case Right(v) => p fulfill v
      }
    }
    
    p.future
  }
  
  /** Creates a new future by filtering the value of the current future with a predicate.
   *  
   *  If the current future contains a value which satisfies the predicate, the new future will also hold that value.
   *  Otherwise, the resulting future will fail with a `NoSuchElementException`.
   *  
   *  If the current future fails or times out, the resulting future also fails or times out, respectively.
   *
   *  Example:
   *  {{{
   *  val f = future { 5 }
   *  val g = g filter { _ % 2 == 1 }
   *  val h = f filter { _ % 2 == 0 }
   *  block on g // evaluates to 5
   *  block on h // throw a NoSuchElementException
   *  }}}
   */
  def filter(pred: T => Boolean): Future[T] = {
    val p = newPromise[T]
    
    onComplete {
      case Left(t) => p break t
      case Right(v) => if (pred(v)) p fulfill v else p break new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
    }
    
    p.future
  }
  
}

object Future {
  
  def all[T,Coll[_] <: Traversable[_]](fs: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[Future[T]],T,Coll[T]]): Future[Coll[T]] = {
    val builder = cbf(fs)
    val p: Promise[Coll[T]] = executionContext.promise[Coll[T]]
    
    if (fs.size == 1) fs.head onComplete {
      case Left(t) => p break t
      case Right(v) => builder += v
        p fulfill builder.result
    } else {
      val restFutures = all(fs.tail)
      fs.head onComplete {
        case Left(t) => p break t
        case Right(v) => builder += v
          restFuture onComplete {
            case Left(t) => p break t
            case Right(vs) => for (v <- vs) builder += v
              p fulfill builder.result
          }
      }
    }
    p.future

  }

}