aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala102
1 files changed, 0 insertions, 102 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
deleted file mode 100644
index 44d75cd..0000000
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import akka.NotUsed
-import akka.stream.Materializer
-import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}
-
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-/** An extension to message buses that offers an Akka-Streams API.
- *
- * Example usage of a stream that subscribes to one topic, prints any messages
- * it receives and finally acknowledges them
- * their receipt.
- * {{{
- * val bus: StreamBus = ???
- * val topic = Topic.string("topic")
- * bus.subscribe(topic1)
- * .map{ msg =>
- * print(msg.data)
- * msg
- * }
- * .to(bus.acknowledge)
- * .run()
- * }}} */
-trait StreamBus extends Bus {
-
- /** Flow that publishes any messages to a given topic.
- * Emits messages once they have been published to the underlying bus. */
- def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
- Flow[A]
- .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
- .mapAsync(1) { a =>
- publishMessages(topic, a).map(_ => a)
- }
- .mapConcat(list => list.toList)
- }
-
- /** Sink that acknowledges the receipt of a message. */
- def acknowledge: Sink[MessageId, NotUsed] = {
- Flow[MessageId]
- .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
- .mapAsync(1)(acknowledgeMessages(_))
- .to(Sink.ignore)
- }
-
- /** Source that listens to a subscription and receives any messages sent to its topic. */
- def subscribe[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
- Source
- .unfoldAsync((topic, config))(
- topicAndConfig =>
- fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
- Some(topicAndConfig -> msgs))
- )
- .filter(_.nonEmpty)
- .mapConcat(messages => messages.toList)
- }
-
- def runWithRestart[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig,
- minBackoff: FiniteDuration = 3.seconds,
- maxBackoff: FiniteDuration = 30.seconds,
- randomFactor: Double = 0.2,
- maxRestarts: Int = 20
- )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
- RestartSource
- .withBackoff[MessageId](
- minBackoff,
- maxBackoff,
- randomFactor,
- maxRestarts
- ) { () =>
- subscribe(topic, config)
- .via(processMessage)
- .log(topic.name)
- .mapConcat(identity)
- }
- .to(acknowledge)
- .run()
- }
-
- def handleMessage[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig,
- parallelism: Int = 1,
- minBackoff: FiniteDuration = 3.seconds,
- maxBackoff: FiniteDuration = 30.seconds,
- randomFactor: Double = 0.2,
- maxRestarts: Int = 20
- )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
- runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
- Flow[Message[A]].mapAsync(parallelism) { message =>
- processMessage(message.data).map(_ => message.id :: Nil)
- }
- }
- }
-}