aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2018-09-12 13:29:04 -0700
committerGitHub <noreply@github.com>2018-09-12 13:29:04 -0700
commit9788477d1c287c967f4d96c7e756cffd3996c036 (patch)
tree356bdb42b095a2975ee0e60c8e13605111e40342
parent8fef53d44a57008dea411b882b12bc3d5d1ca2e0 (diff)
downloaddriver-core-9788477d1c287c967f4d96c7e756cffd3996c036.tar.gz
driver-core-9788477d1c287c967f4d96c7e756cffd3996c036.tar.bz2
driver-core-9788477d1c287c967f4d96c7e756cffd3996c036.zip
RFC Add basicSubscribeWithRestart helper method to StreamBus (#214)
* Add basicSubscribeWithRestart helper method to StreamBus * Include helper for subscribe with Flow * PR comments * Rename methods, call run
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala44
1 files changed, 43 insertions, 1 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
index aabd3dc..a9ba3a7 100644
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -2,9 +2,12 @@ package xyz.driver.core
package messaging
import akka.NotUsed
-import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}
import scala.collection.mutable.ListBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
/** An extension to message buses that offers an Akka-Streams API.
*
@@ -57,4 +60,43 @@ trait StreamBus extends Bus {
.mapConcat(messages => messages.toList)
}
+ def runWithRestart[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
+ RestartSource
+ .withBackoff[MessageId](
+ minBackoff,
+ maxBackoff,
+ randomFactor,
+ maxRestarts
+ ) { () =>
+ subscribe(topic, config)
+ .via(processMessage.recover({ case _ => Nil }))
+ .log(topic.name)
+ .mapConcat(identity)
+ }
+ .to(acknowledge)
+ .run()
+ }
+
+ def handleMessage[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ parallelism: Int = 1,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
+ runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
+ Flow[Message[A]].mapAsync(parallelism) { message =>
+ processMessage(message.data).map(_ => message.id :: Nil)
+ }
+ }
+ }
}