1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
package xyz.driver.core
import com.typesafe.scalalogging.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
object future {
val log = Logger("Driver.Future")
implicit class RichFuture[T](f: Future[T]) {
def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = {
val p = Promise[U]()
f.onComplete(r => p.complete(Try(pf(r))))
p.future
}
def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = {
future.failFastZip(f, that)
}
}
def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = {
t.foldLeft(Future.successful(Nil: List[T])) { (f, i) =>
failFastZip(f, i).map { case (tail, h) => h :: tail }
}
.map(_.reverse)
}
/**
* Standard scala zip waits forever on the left side, even if the right side fails
*/
def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = {
type State = Either[(T, Promise[U]), (U, Promise[T])]
val middleState = Promise[State]()
ft.onComplete {
case f @ Failure(err) =>
if (!middleState.tryFailure(err)) {
// the right has already succeeded
middleState.future.foreach {
case Right((_, pt)) => pt.complete(f)
case Left((t1, _)) => // This should never happen
log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set")
}
}
case Success(t) =>
// Create the next promise:
val pu = Promise[U]()
if (!middleState.trySuccess(Left((t, pu)))) {
// we can't set, so the other promise beat us here.
middleState.future.foreach {
case Right((_, pt)) => pt.success(t)
case Left((t1, _)) => // This should never happen
log.error(s"Logic error: tried to set Left($t) but Left($t1) already set")
}
}
}
fu.onComplete {
case f @ Failure(err) =>
if (!middleState.tryFailure(err)) {
// we can't set, so the other promise beat us here.
middleState.future.foreach {
case Left((_, pu)) => pu.complete(f)
case Right((u1, _)) => // This should never happen
log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set")
}
}
case Success(u) =>
// Create the next promise:
val pt = Promise[T]()
if (!middleState.trySuccess(Right((u, pt)))) {
// we can't set, so the other promise beat us here.
middleState.future.foreach {
case Left((_, pu)) => pu.success(u)
case Right((u1, _)) => // This should never happen
log.error(s"Logic error: tried to set Right($u) but Right($u1) already set")
}
}
}
middleState.future.flatMap {
case Left((t, pu)) => pu.future.map((t, _))
case Right((u, pt)) => pt.future.map((_, u))
}
}
}
|