diff options
author | Zach Smith <zach@driver.xyz> | 2018-09-12 13:29:04 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-12 13:29:04 -0700 |
commit | 9788477d1c287c967f4d96c7e756cffd3996c036 (patch) | |
tree | 356bdb42b095a2975ee0e60c8e13605111e40342 /src/main/scala/xyz/driver/core/messaging/StreamBus.scala | |
parent | 8fef53d44a57008dea411b882b12bc3d5d1ca2e0 (diff) | |
download | driver-core-9788477d1c287c967f4d96c7e756cffd3996c036.tar.gz driver-core-9788477d1c287c967f4d96c7e756cffd3996c036.tar.bz2 driver-core-9788477d1c287c967f4d96c7e756cffd3996c036.zip |
RFC Add basicSubscribeWithRestart helper method to StreamBus (#214)
* Add basicSubscribeWithRestart helper method to StreamBus
* Include helper for subscribe with Flow
* PR comments
* Rename methods, call run
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/StreamBus.scala | 44 |
1 files changed, 43 insertions, 1 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala index aabd3dc..a9ba3a7 100644 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -2,9 +2,12 @@ package xyz.driver.core package messaging import akka.NotUsed -import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.Materializer +import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} import scala.collection.mutable.ListBuffer +import scala.concurrent.Future +import scala.concurrent.duration._ /** An extension to message buses that offers an Akka-Streams API. * @@ -57,4 +60,43 @@ trait StreamBus extends Bus { .mapConcat(messages => messages.toList) } + def runWithRestart[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { + RestartSource + .withBackoff[MessageId]( + minBackoff, + maxBackoff, + randomFactor, + maxRestarts + ) { () => + subscribe(topic, config) + .via(processMessage.recover({ case _ => Nil })) + .log(topic.name) + .mapConcat(identity) + } + .to(acknowledge) + .run() + } + + def handleMessage[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + parallelism: Int = 1, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { + runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { + Flow[Message[A]].mapAsync(parallelism) { message => + processMessage(message.data).map(_ => message.id :: Nil) + } + } + } } |