aboutsummaryrefslogtreecommitdiff
path: root/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala102
1 files changed, 102 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
new file mode 100644
index 0000000..44d75cd
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -0,0 +1,102 @@
+package xyz.driver.core
+package messaging
+
+import akka.NotUsed
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+/** An extension to message buses that offers an Akka-Streams API.
+ *
+ * Example usage of a stream that subscribes to one topic, prints any messages
+ * it receives and finally acknowledges them
+ * their receipt.
+ * {{{
+ * val bus: StreamBus = ???
+ * val topic = Topic.string("topic")
+ * bus.subscribe(topic1)
+ * .map{ msg =>
+ * print(msg.data)
+ * msg
+ * }
+ * .to(bus.acknowledge)
+ * .run()
+ * }}} */
+trait StreamBus extends Bus {
+
+ /** Flow that publishes any messages to a given topic.
+ * Emits messages once they have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
+ Flow[A]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
+ .mapAsync(1) { a =>
+ publishMessages(topic, a).map(_ => a)
+ }
+ .mapConcat(list => list.toList)
+ }
+
+ /** Sink that acknowledges the receipt of a message. */
+ def acknowledge: Sink[MessageId, NotUsed] = {
+ Flow[MessageId]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
+ .mapAsync(1)(acknowledgeMessages(_))
+ .to(Sink.ignore)
+ }
+
+ /** Source that listens to a subscription and receives any messages sent to its topic. */
+ def subscribe[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
+ Source
+ .unfoldAsync((topic, config))(
+ topicAndConfig =>
+ fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
+ Some(topicAndConfig -> msgs))
+ )
+ .filter(_.nonEmpty)
+ .mapConcat(messages => messages.toList)
+ }
+
+ def runWithRestart[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
+ RestartSource
+ .withBackoff[MessageId](
+ minBackoff,
+ maxBackoff,
+ randomFactor,
+ maxRestarts
+ ) { () =>
+ subscribe(topic, config)
+ .via(processMessage)
+ .log(topic.name)
+ .mapConcat(identity)
+ }
+ .to(acknowledge)
+ .run()
+ }
+
+ def handleMessage[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ parallelism: Int = 1,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
+ runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
+ Flow[Message[A]].mapAsync(parallelism) { message =>
+ processMessage(message.data).map(_ => message.id :: Nil)
+ }
+ }
+ }
+}