aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/future.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/future.scala')
-rw-r--r--src/main/scala/xyz/driver/core/future.scala87
1 files changed, 87 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/future.scala b/src/main/scala/xyz/driver/core/future.scala
new file mode 100644
index 0000000..1ee3576
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/future.scala
@@ -0,0 +1,87 @@
+package xyz.driver.core
+
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+object future {
+ val log = Logger("Driver.Future")
+
+ implicit class RichFuture[T](f: Future[T]) {
+ def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = {
+ val p = Promise[U]()
+ f.onComplete(r => p.complete(Try(pf(r))))
+ p.future
+ }
+
+ def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = {
+ future.failFastZip(f, that)
+ }
+ }
+
+ def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = {
+ t.foldLeft(Future.successful(Nil: List[T])) { (f, i) =>
+ failFastZip(f, i).map { case (tail, h) => h :: tail }
+ }
+ .map(_.reverse)
+ }
+
+ /**
+ * Standard scala zip waits forever on the left side, even if the right side fails
+ */
+ def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = {
+ type State = Either[(T, Promise[U]), (U, Promise[T])]
+ val middleState = Promise[State]()
+
+ ft.onComplete {
+ case f @ Failure(err) =>
+ if (!middleState.tryFailure(err)) {
+ // the right has already succeeded
+ middleState.future.foreach {
+ case Right((_, pt)) => pt.complete(f)
+ case Left((t1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set")
+ }
+ }
+ case Success(t) =>
+ // Create the next promise:
+ val pu = Promise[U]()
+ if (!middleState.trySuccess(Left((t, pu)))) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Right((_, pt)) => pt.success(t)
+ case Left((t1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Left($t) but Left($t1) already set")
+ }
+ }
+ }
+ fu.onComplete {
+ case f @ Failure(err) =>
+ if (!middleState.tryFailure(err)) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Left((_, pu)) => pu.complete(f)
+ case Right((u1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set")
+ }
+ }
+ case Success(u) =>
+ // Create the next promise:
+ val pt = Promise[T]()
+ if (!middleState.trySuccess(Right((u, pt)))) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Left((_, pu)) => pu.success(u)
+ case Right((u1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Right($u) but Right($u1) already set")
+ }
+ }
+ }
+
+ middleState.future.flatMap {
+ case Left((t, pu)) => pu.future.map((t, _))
+ case Right((u, pt)) => pt.future.map((_, u))
+ }
+ }
+}