aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/future.scala
blob: 1ee3576e9c7208e6699f188a49187aedee232973 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package xyz.driver.core

import com.typesafe.scalalogging.Logger

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

object future {
  val log = Logger("Driver.Future")

  implicit class RichFuture[T](f: Future[T]) {
    def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = {
      val p = Promise[U]()
      f.onComplete(r => p.complete(Try(pf(r))))
      p.future
    }

    def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = {
      future.failFastZip(f, that)
    }
  }

  def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = {
    t.foldLeft(Future.successful(Nil: List[T])) { (f, i) =>
        failFastZip(f, i).map { case (tail, h) => h :: tail }
      }
      .map(_.reverse)
  }

  /**
    * Standard scala zip waits forever on the left side, even if the right side fails
    */
  def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = {
    type State = Either[(T, Promise[U]), (U, Promise[T])]
    val middleState = Promise[State]()

    ft.onComplete {
      case f @ Failure(err) =>
        if (!middleState.tryFailure(err)) {
          // the right has already succeeded
          middleState.future.foreach {
            case Right((_, pt)) => pt.complete(f)
            case Left((t1, _)) => // This should never happen
              log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set")
          }
        }
      case Success(t) =>
        // Create the next promise:
        val pu = Promise[U]()
        if (!middleState.trySuccess(Left((t, pu)))) {
          // we can't set, so the other promise beat us here.
          middleState.future.foreach {
            case Right((_, pt)) => pt.success(t)
            case Left((t1, _)) => // This should never happen
              log.error(s"Logic error: tried to set Left($t) but Left($t1) already set")
          }
        }
    }
    fu.onComplete {
      case f @ Failure(err) =>
        if (!middleState.tryFailure(err)) {
          // we can't set, so the other promise beat us here.
          middleState.future.foreach {
            case Left((_, pu)) => pu.complete(f)
            case Right((u1, _)) => // This should never happen
              log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set")
          }
        }
      case Success(u) =>
        // Create the next promise:
        val pt = Promise[T]()
        if (!middleState.trySuccess(Right((u, pt)))) {
          // we can't set, so the other promise beat us here.
          middleState.future.foreach {
            case Left((_, pu)) => pu.success(u)
            case Right((u1, _)) => // This should never happen
              log.error(s"Logic error: tried to set Right($u) but Right($u1) already set")
          }
        }
    }

    middleState.future.flatMap {
      case Left((t, pu))  => pu.future.map((t, _))
      case Right((u, pt)) => pt.future.map((_, u))
    }
  }
}