summaryrefslogtreecommitdiff
path: root/test/files/presentation/akka/src/akka/dispatch/Future.scala
blob: 1ad304d7265d69360a6615a188c2c71f64acd2c8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
/**
 *  Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.dispatch

import akka.AkkaException
import akka.event.EventHandler
import akka.actor.{ Actor, Channel }
import akka.util.Duration
import akka.japi.{ Procedure, Function => JFunc }

import scala.util.continuations._

import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS => MILLIS }
import java.util.concurrent.atomic.{ AtomicBoolean }
import java.lang.{ Iterable => JIterable }
import java.util.{ LinkedList => JLinkedList }
import scala.collection.mutable.Stack
import annotation.tailrec

class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause)

object Futures {

  /**
   * Java API, equivalent to Future.apply
   */
  def future[T](body: Callable[T]): Future[T] =
    Future(body.call)

  /**
   * Java API, equivalent to Future.apply
   */
  def future[T](body: Callable[T], timeout: Long): Future[T] =
    Future(body.call, timeout)

  /**
   * Java API, equivalent to Future.apply
   */
  def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
    Future(body.call)(dispatcher)

  /**
   * Java API, equivalent to Future.apply
   */
  def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
    Future(body.call, timeout)(dispatcher)

  /**
   * Returns a Future to the result of the first future in the list that is completed
   */
  def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
    val futureResult = new DefaultCompletableFuture[T](timeout)

    val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _)
    for (f  futures) f onComplete completeFirst

    futureResult
  }

  /**
   * Java API.
   * Returns a Future to the result of the first future in the list that is completed
   */
  def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
    firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)

  /**
   * A non-blocking fold over the specified futures.
   * The fold is performed on the thread where the last future is completed,
   * the result will be the first failure of any of the futures, or any failure in the actual fold,
   * or the result of the fold.
   * Example:
   * <pre>
   *   val result = Futures.fold(0)(futures)(_ + _).await.result
   * </pre>
   */
  def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
    if (futures.isEmpty) {
      new AlreadyCompletedFuture[R](Right(zero))
    } else {
      val result = new DefaultCompletableFuture[R](timeout)
      val results = new ConcurrentLinkedQueue[T]()
      val allDone = futures.size

      val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature?
        f.value.get match {
          case r: Right[Throwable, T] =>
            results add r.b
            if (results.size == allDone) { //Only one thread can get here
              try {
                result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun)
              } catch {
                case e: Exception =>
                  EventHandler.error(e, this, e.getMessage)
                  result completeWithException e
              }
              finally {
                results.clear
              }
            }
          case l: Left[Throwable, T] =>
            result completeWithException l.a
            results.clear
        }
      }

      futures foreach { _ onComplete aggregate }
      result
    }
  }

  /**
   * Java API
   * A non-blocking fold over the specified futures.
   * The fold is performed on the thread where the last future is completed,
   * the result will be the first failure of any of the futures, or any failure in the actual fold,
   * or the result of the fold.
   */
  def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
    fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)

  /**
   * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
   * Example:
   * <pre>
   *   val result = Futures.reduce(futures)(_ + _).await.result
   * </pre>
   */
  def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) => T): Future[R] = {
    if (futures.isEmpty)
      new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left")))
    else {
      val result = new DefaultCompletableFuture[R](timeout)
      val seedFound = new AtomicBoolean(false)
      val seedFold: Future[T] => Unit = f => {
        if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
          f.value.get match {
            case r: Right[Throwable, T] =>
              result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op))
            case l: Left[Throwable, T] =>
              result.completeWithException(l.a)
          }
        }
      }
      for (f  futures) f onComplete seedFold //Attach the listener to the Futures
      result
    }
  }

  /**
   * Java API.
   * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
   */
  def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
    reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)

  /**
   * Java API.
   * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]].
   * Useful for reducing many Futures into a single Future.
   */
  def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JLinkedList[A]] =
    scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) =>
      for (r  fr; a  fa) yield {
        r add a
        r
      })

  /**
   * Java API.
   * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]].
   * Useful for reducing many Futures into a single Future.
   */
  def sequence[A](in: JIterable[Future[A]]): Future[JLinkedList[A]] = sequence(in, Actor.TIMEOUT)

  /**
   * Java API.
   * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B].
   * This is useful for performing a parallel map. For example, to apply a function to all items of a list
   * in parallel.
   */
  def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] =
    scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) =>
      val fb = fn(a)
      for (r  fr; b  fb) yield {
        r add b
        r
      }
    }

  /**
   * Java API.
   * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B].
   * This is useful for performing a parallel map. For example, to apply a function to all items of a list
   * in parallel.
   */
  def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn)

  // =====================================
  // Deprecations
  // =====================================

  /**
   * (Blocking!)
   */
  @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)", "1.1")
  def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)

  /**
   *  Returns the First Future that is completed (blocking!)
   */
  @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await", "1.1")
  def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await

  /**
   * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
   */
  @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }", "1.1")
  def awaitMap[A, B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
    in map { f => fun(f.await) }

  /**
   * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
   */
  @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException", "1.1")
  def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1, f2)).await.resultOrException
}

object Future {
  /**
   * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
   * The execution is performed by the specified Dispatcher.
   */
  def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] =
    dispatcher.dispatchFuture(() => body, timeout)

  /**
   * Construct a completable channel
   */
  def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
    val future = empty[Any](timeout)
    def !(msg: Any) = future completeWithResult msg
  }

  /**
   * Create an empty Future with default timeout
   */
  def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout)

  import scala.collection.mutable.Builder
  import scala.collection.generic.CanBuildFrom

  /**
   * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
   * Useful for reducing many Futures into a single Future.
   */
  def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
    in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r  fr; a  fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)

  /**
   * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
   * This is useful for performing a parallel map. For example, to apply a function to all items of a list
   * in parallel:
   * <pre>
   * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
   * </pre>
   */
  def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
    in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
      val fb = fn(a.asInstanceOf[A])
      for (r  fr; b  fb) yield (r += b)
    }.map(_.result)

  /**
   * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
   * Continuations plugin.
   *
   * Within the block, the result of a Future may be accessed by calling Future.apply. At that point
   * execution is suspended with the rest of the block being stored in a continuation until the result
   * of the Future is available. If an Exception is thrown while processing, it will be contained
   * within the resulting Future.
   *
   * This allows working with Futures in an imperative style without blocking for each result.
   *
   * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the
   * value of the other Future is available.
   *
   * The Delimited Continuations compiler plugin must be enabled in order to use this method.
   */
  def flow[A](body: => A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = {
    val future = Promise[A](timeout)
    (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f =>
      val opte = f.exception
      if (opte.isDefined) future completeWithException (opte.get)
    }
    future
  }

  private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() => Unit]]]() {
    override def initialValue = None
  }
}

sealed trait Future[+T] {

  /**
   * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
   *
   * Returns the result of this Future without blocking, by suspending execution and storing it as a
   * continuation until the result is available.
   *
   * If this Future is untyped (a Future[Nothing]), a type parameter must be explicitly provided or
   * execution will fail. The normal result of getting a Future from an ActorRef using !!! will return
   * an untyped Future.
   */
  def apply[A >: T](): A @cps[Future[Any]] = shift(this flatMap (_: A => Future[Any]))

  /**
   * Blocks awaiting completion of this Future, then returns the resulting value,
   * or throws the completed exception
   *
   * Scala & Java API
   *
   * throws FutureTimeoutException if this Future times out when waiting for completion
   */
  def get: T = this.await.resultOrException.get

  /**
   * Blocks the current thread until the Future has been completed or the
   * timeout has expired. In the case of the timeout expiring a
   * FutureTimeoutException will be thrown.
   */
  def await: Future[T]

  /**
   * Blocks the current thread until the Future has been completed or the
   * timeout has expired. The timeout will be the least value of 'atMost' and the timeout
   * supplied at the constructuion of this Future.
   * In the case of the timeout expiring a FutureTimeoutException will be thrown.
   */
  def await(atMost: Duration): Future[T]

  /**
   * Blocks the current thread until the Future has been completed. Use
   * caution with this method as it ignores the timeout and will block
   * indefinitely if the Future is never completed.
   */
  @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1")
  def awaitBlocking: Future[T]

  /**
   * Tests whether this Future has been completed.
   */
  final def isCompleted: Boolean = value.isDefined

  /**
   * Tests whether this Future's timeout has expired.
   *
   * Note that an expired Future may still contain a value, or it may be
   * completed with a value.
   */
  def isExpired: Boolean

  /**
   * This Future's timeout in nanoseconds.
   */
  def timeoutInNanos: Long

  /**
   * The contained value of this Future. Before this Future is completed
   * the value will be None. After completion the value will be Some(Right(t))
   * if it contains a valid result, or Some(Left(error)) if it contains
   * an exception.
   */
  def value: Option[Either[Throwable, T]]

  /**
   * Returns the successful result of this Future if it exists.
   */
  final def result: Option[T] = {
    val v = value
    if (v.isDefined) v.get.right.toOption
    else None
  }

  /**
   * Returns the contained exception of this Future if it exists.
   */
  final def exception: Option[Throwable] = {
    val v = value
    if (v.isDefined) v.get.left.toOption
    else None
  }

  /**
   * When this Future is completed, apply the provided function to the
   * Future. If the Future has already been completed, this will apply
   * immediately.
   */
  def onComplete(func: Future[T] => Unit): Future[T]

  /**
   * When the future is completed with a valid result, apply the provided
   * PartialFunction to the result.
   * <pre>
   *   val result = future receive {
   *     case Foo => "foo"
   *     case Bar => "bar"
   *   }.await.result
   * </pre>
   */
  final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f =>
    val optr = f.result
    if (optr.isDefined) {
      val r = optr.get
      if (pf.isDefinedAt(r)) pf(r)
    }
  }

  /**
   * Creates a new Future by applying a PartialFunction to the successful
   * result of this Future if a match is found, or else return a MatchError.
   * If this Future is completed with an exception then the new Future will
   * also contain this exception.
   * Example:
   * <pre>
   * val future1 = for {
   *   a <- actor !!! Req("Hello") collect { case Res(x: Int)    => x }
   *   b <- actor !!! Req(a)       collect { case Res(x: String) => x }
   *   c <- actor !!! Req(7)       collect { case Res(x: String) => x }
   * } yield b + "-" + c
   * </pre>
   */
  final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
    val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
    onComplete { ft =>
      val v = ft.value.get
      fa complete {
        if (v.isLeft) v.asInstanceOf[Either[Throwable, A]]
        else {
          try {
            val r = v.right.get
            if (pf isDefinedAt r) Right(pf(r))
            else Left(new MatchError(r))
          } catch {
            case e: Exception =>
              EventHandler.error(e, this, e.getMessage)
              Left(e)
          }
        }
      }
    }
    fa
  }

  /**
   * Creates a new Future that will handle any matching Throwable that this
   * Future might contain. If there is no match, or if this Future contains
   * a valid result then the new Future will contain the same.
   * Example:
   * <pre>
   * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
   * Future(6 / 0) failure { case e: NotFoundException   => 0 } // result: exception
   * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
   * </pre>
   */
  final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
    val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
    onComplete { ft =>
      val opte = ft.exception
      fa complete {
        if (opte.isDefined) {
          val e = opte.get
          try {
            if (pf isDefinedAt e) Right(pf(e))
            else Left(e)
          } catch {
            case x: Exception => Left(x)
          }
        } else ft.value.get
      }
    }
    fa
  }

  /**
   * Creates a new Future by applying a function to the successful result of
   * this Future. If this Future is completed with an exception then the new
   * Future will also contain this exception.
   * Example:
   * <pre>
   * val future1 = for {
   *   a: Int    <- actor !!! "Hello" // returns 5
   *   b: String <- actor !!! a       // returns "10"
   *   c: String <- actor !!! 7       // returns "14"
   * } yield b + "-" + c
   * </pre>
   */
  final def map[A](f: T => A): Future[A] = {
    val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
    onComplete { ft =>
      val optv = ft.value
      if (optv.isDefined) {
        val v = optv.get
        if (v.isLeft)
          fa complete v.asInstanceOf[Either[Throwable, A]]
        else {
          fa complete (try {
            Right(f(v.right.get))
          } catch {
            case e: Exception =>
              EventHandler.error(e, this, e.getMessage)
              Left(e)
          })
        }
      }
    }
    fa
  }

  /**
   * Creates a new Future by applying a function to the successful result of
   * this Future, and returns the result of the function as the new Future.
   * If this Future is completed with an exception then the new Future will
   * also contain this exception.
   * Example:
   * <pre>
   * val future1 = for {
   *   a: Int    <- actor !!! "Hello" // returns 5
   *   b: String <- actor !!! a       // returns "10"
   *   c: String <- actor !!! 7       // returns "14"
   * } yield b + "-" + c
   * </pre>
   */
  final def flatMap[A](f: T => Future[A]): Future[A] = {
    val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
    onComplete { ft =>
      val optv = ft.value
      if (optv.isDefined) {
        val v = optv.get
        if (v.isLeft)
          fa complete v.asInstanceOf[Either[Throwable, A]]
        else {
          try {
            fa.completeWith(f(v.right.get))
          } catch {
            case e: Exception =>
              EventHandler.error(e, this, e.getMessage)
              fa completeWithException e
          }
        }
      }
    }
    fa
  }

  final def foreach(f: T => Unit): Unit = onComplete { ft =>
    val optr = ft.result
    if (optr.isDefined)
      f(optr.get)
  }

  final def filter(p: Any => Boolean): Future[Any] = {
    val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS)
    onComplete { ft =>
      val optv = ft.value
      if (optv.isDefined) {
        val v = optv.get
        if (v.isLeft)
          f complete v
        else {
          val r = v.right.get
          f complete (try {
            if (p(r)) Right(r)
            else Left(new MatchError(r))
          } catch {
            case e: Exception =>
              EventHandler.error(e, this, e.getMessage)
              Left(e)
          })
        }
      }
    }
    f
  }

  /**
   * Returns the current result, throws the exception is one has been raised, else returns None
   */
  final def resultOrException: Option[T] = {
    val v = value
    if (v.isDefined) {
      val r = v.get
      if (r.isLeft) throw r.left.get
      else r.right.toOption
    } else None
  }

  /* Java API */
  final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_))

  final def map[A >: T, B](f: JFunc[A, B]): Future[B] = map(f(_))

  final def flatMap[A >: T, B](f: JFunc[A, Future[B]]): Future[B] = flatMap(f(_))

  final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_))

  final def filter(p: JFunc[Any, Boolean]): Future[Any] = filter(p(_))

}

object Promise {

  def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout)

  def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT)

}

/**
 * Essentially this is the Promise (or write-side) of a Future (read-side).
 */
trait CompletableFuture[T] extends Future[T] {
  /**
   * Completes this Future with the specified result, if not already completed.
   * @return this
   */
  def complete(value: Either[Throwable, T]): Future[T]

  /**
   * Completes this Future with the specified result, if not already completed.
   * @return this
   */
  final def completeWithResult(result: T): Future[T] = complete(Right(result))

  /**
   * Completes this Future with the specified exception, if not already completed.
   * @return this
   */
  final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception))

  /**
   * Completes this Future with the specified other Future, when that Future is completed,
   * unless this Future has already been completed.
   * @return this.
   */
  final def completeWith(other: Future[T]): Future[T] = {
    other onComplete { f => complete(f.value.get) }
    this
  }

  final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => cont(complete(Right(value))) }

  final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) =>
    val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT)
    this completeWith other onComplete { f =>
      try {
        fr completeWith cont(f)
      } catch {
        case e: Exception =>
          EventHandler.error(e, this, e.getMessage)
          fr completeWithException e
      }
    }
    fr
  }

}

/**
 * The default concrete Future implementation.
 */
class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {

  def this() = this(0, MILLIS)

  def this(timeout: Long) = this(timeout, MILLIS)

  val timeoutInNanos = timeunit.toNanos(timeout)
  private val _startTimeInNanos = currentTimeInNanos
  private val _lock = new ReentrantLock
  private val _signal = _lock.newCondition
  private var _value: Option[Either[Throwable, T]] = None
  private var _listeners: List[Future[T] => Unit] = Nil

  /**
   * Must be called inside _lock.lock<->_lock.unlock
   */
  @tailrec
  private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
    if (_value.isEmpty && waitTimeNanos > 0) {
      val start = currentTimeInNanos
      val remainingNanos = try {
        _signal.awaitNanos(waitTimeNanos)
      } catch {
        case e: InterruptedException =>
          waitTimeNanos - (currentTimeInNanos - start)
      }
      awaitUnsafe(remainingNanos)
    } else {
      _value.isDefined
    }
  }

  def await(atMost: Duration) = {
    _lock.lock
    if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this
    else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
  }

  def await = {
    _lock.lock
    if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this
    else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
  }

  def awaitBlocking = {
    _lock.lock
    try {
      while (_value.isEmpty) {
        _signal.await
      }
      this
    } finally {
      _lock.unlock
    }
  }

  def isExpired: Boolean = timeLeft() <= 0

  def value: Option[Either[Throwable, T]] = {
    _lock.lock
    try {
      _value
    } finally {
      _lock.unlock
    }
  }

  def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
    _lock.lock
    val notifyTheseListeners = try {
      if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired
        _value = Some(value)
        val existingListeners = _listeners
        _listeners = Nil
        existingListeners
      } else Nil
    } finally {
      _signal.signalAll
      _lock.unlock
    }

    if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation
      @tailrec
      def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) {
        if (rest.nonEmpty) {
          notifyCompleted(rest.head)
          while (callbacks.nonEmpty) { callbacks.pop().apply() }
          runCallbacks(rest.tail, callbacks)
        }
      }

      val pending = Future.callbacksPendingExecution.get
      if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow)
        pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute
          val doNotify = notifyCompleted _ //Hoist closure to avoid garbage
          notifyTheseListeners foreach doNotify
        })
      } else {
        try {
          val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks
          Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator
          runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated
        } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup
      }
    }

    this
  }

  def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
    _lock.lock
    val notifyNow = try {
      if (_value.isEmpty) {
        if (!isExpired) { //Only add the listener if the future isn't expired
          _listeners ::= func
          false
        } else false //Will never run the callback since the future is expired
      } else true
    } finally {
      _lock.unlock
    }

    if (notifyNow) notifyCompleted(func)

    this
  }

  private def notifyCompleted(func: Future[T] => Unit) {
    try {
      func(this)
    } catch {
      case e => EventHandler notify EventHandler.Error(e, this)
    }
  }

  @inline
  private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
  @inline
  private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
}

/**
 * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
 * a Future-composition but you already have a value to contribute.
 */
sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] {
  val value = Some(suppliedValue)

  def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
  def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this }
  def await(atMost: Duration): Future[T] = this
  def await: Future[T] = this
  def awaitBlocking: Future[T] = this
  def isExpired: Boolean = true
  def timeoutInNanos: Long = 0
}