From 9788477d1c287c967f4d96c7e756cffd3996c036 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Wed, 12 Sep 2018 13:29:04 -0700 Subject: RFC Add basicSubscribeWithRestart helper method to StreamBus (#214) * Add basicSubscribeWithRestart helper method to StreamBus * Include helper for subscribe with Flow * PR comments * Rename methods, call run --- .../xyz/driver/core/messaging/StreamBus.scala | 44 +++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala index aabd3dc..a9ba3a7 100644 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -2,9 +2,12 @@ package xyz.driver.core package messaging import akka.NotUsed -import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.Materializer +import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} import scala.collection.mutable.ListBuffer +import scala.concurrent.Future +import scala.concurrent.duration._ /** An extension to message buses that offers an Akka-Streams API. * @@ -57,4 +60,43 @@ trait StreamBus extends Bus { .mapConcat(messages => messages.toList) } + def runWithRestart[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { + RestartSource + .withBackoff[MessageId]( + minBackoff, + maxBackoff, + randomFactor, + maxRestarts + ) { () => + subscribe(topic, config) + .via(processMessage.recover({ case _ => Nil })) + .log(topic.name) + .mapConcat(identity) + } + .to(acknowledge) + .run() + } + + def handleMessage[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + parallelism: Int = 1, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { + runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { + Flow[Message[A]].mapAsync(parallelism) { message => + processMessage(message.data).map(_ => message.id :: Nil) + } + } + } } -- cgit v1.2.3