diff options
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r-- | core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala new file mode 100644 index 0000000..44d75cd --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -0,0 +1,102 @@ +package xyz.driver.core +package messaging + +import akka.NotUsed +import akka.stream.Materializer +import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** An extension to message buses that offers an Akka-Streams API. + * + * Example usage of a stream that subscribes to one topic, prints any messages + * it receives and finally acknowledges them + * their receipt. + * {{{ + * val bus: StreamBus = ??? + * val topic = Topic.string("topic") + * bus.subscribe(topic1) + * .map{ msg => + * print(msg.data) + * msg + * } + * .to(bus.acknowledge) + * .run() + * }}} */ +trait StreamBus extends Bus { + + /** Flow that publishes any messages to a given topic. + * Emits messages once they have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { + Flow[A] + .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) + .mapAsync(1) { a => + publishMessages(topic, a).map(_ => a) + } + .mapConcat(list => list.toList) + } + + /** Sink that acknowledges the receipt of a message. */ + def acknowledge: Sink[MessageId, NotUsed] = { + Flow[MessageId] + .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) + .mapAsync(1)(acknowledgeMessages(_)) + .to(Sink.ignore) + } + + /** Source that listens to a subscription and receives any messages sent to its topic. */ + def subscribe[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { + Source + .unfoldAsync((topic, config))( + topicAndConfig => + fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => + Some(topicAndConfig -> msgs)) + ) + .filter(_.nonEmpty) + .mapConcat(messages => messages.toList) + } + + def runWithRestart[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { + RestartSource + .withBackoff[MessageId]( + minBackoff, + maxBackoff, + randomFactor, + maxRestarts + ) { () => + subscribe(topic, config) + .via(processMessage) + .log(topic.name) + .mapConcat(identity) + } + .to(acknowledge) + .run() + } + + def handleMessage[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + parallelism: Int = 1, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { + runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { + Flow[Message[A]].mapAsync(parallelism) { message => + processMessage(message.data).map(_ => message.id :: Nil) + } + } + } +} |