diff options
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/StreamBus.scala | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala new file mode 100644 index 0000000..aa99960 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -0,0 +1,59 @@ +package xyz.driver.core +package messaging + +import akka.stream.scaladsl.{Flow, Sink, Source} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionContext + +/** An extension to message buses that offers an Akka-Streams API. + * + * Example usage of a stream that subscribes to one topic, prints any messages + * it receives and finally acknowledges them + * their receipt. + * {{{ + * val bus: StreamBus = ??? + * val topic = Topic.string("topic") + * bus.subscribe(topic1) + * .map{ msg => + * print(msg.data) + * msg + * } + * .to(bus.acknowledge) + * .run() + * }}} */ +trait StreamBus extends Bus { + implicit def executionContext: ExecutionContext + + /** Flow that publishes any messages to a given topic. + * Emits messages once have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, _] = { + Flow[A] + .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) + .mapAsync(1) { a => + publishMessages(topic, a).map(_ => a) + } + .mapConcat(list => list.toList) + } + + /** Sink that acknowledges the receipt of a message. */ + def acknowledge: Sink[MessageId, _] = { + Flow[MessageId] + .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) + .mapAsync(1)(acknowledgeMessages(_)) + .to(Sink.ignore) + } + + /** Source that listens to a subscription and receives any messages sent to its topic. */ + def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = { + Source + .unfoldAsync((topic, config))( + topicAndConfig => + fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => + Some(topicAndConfig -> msgs)) + ) + .filter(_.nonEmpty) + .mapConcat(messages => messages.toList) + } + +} |