aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala59
1 files changed, 59 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
new file mode 100644
index 0000000..aa99960
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -0,0 +1,59 @@
+package xyz.driver.core
+package messaging
+
+import akka.stream.scaladsl.{Flow, Sink, Source}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.ExecutionContext
+
+/** An extension to message buses that offers an Akka-Streams API.
+ *
+ * Example usage of a stream that subscribes to one topic, prints any messages
+ * it receives and finally acknowledges them
+ * their receipt.
+ * {{{
+ * val bus: StreamBus = ???
+ * val topic = Topic.string("topic")
+ * bus.subscribe(topic1)
+ * .map{ msg =>
+ * print(msg.data)
+ * msg
+ * }
+ * .to(bus.acknowledge)
+ * .run()
+ * }}} */
+trait StreamBus extends Bus {
+ implicit def executionContext: ExecutionContext
+
+ /** Flow that publishes any messages to a given topic.
+ * Emits messages once have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, _] = {
+ Flow[A]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
+ .mapAsync(1) { a =>
+ publishMessages(topic, a).map(_ => a)
+ }
+ .mapConcat(list => list.toList)
+ }
+
+ /** Sink that acknowledges the receipt of a message. */
+ def acknowledge: Sink[MessageId, _] = {
+ Flow[MessageId]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
+ .mapAsync(1)(acknowledgeMessages(_))
+ .to(Sink.ignore)
+ }
+
+ /** Source that listens to a subscription and receives any messages sent to its topic. */
+ def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = {
+ Source
+ .unfoldAsync((topic, config))(
+ topicAndConfig =>
+ fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
+ Some(topicAndConfig -> msgs))
+ )
+ .filter(_.nonEmpty)
+ .mapConcat(messages => messages.toList)
+ }
+
+}