aboutsummaryrefslogtreecommitdiff
path: root/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
diff options
context:
space:
mode:
authorKseniya Tomskikh <ktomskikh@driver.xyz>2018-10-17 17:02:58 +0800
committerKseniya Tomskikh <ktomskikh@driver.xyz>2018-10-17 17:02:58 +0800
commit95c3aeecd7e6ad04ce8d216c09e779f5ca38aa6a (patch)
treedfc94f20d00c84f9dde120f065bfc9298bdff0dc /core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
parentf5d0b038457ed9d01687f0949e22e08a6b116066 (diff)
parenta43556851bf986b81351fc9f1ae5ba51bf21dc47 (diff)
downloaddriver-core-kseniya/typized-id.tar.gz
driver-core-kseniya/typized-id.tar.bz2
driver-core-kseniya/typized-id.zip
Merge branch 'master' into kseniya/typized-idkseniya/typized-id
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala102
1 files changed, 102 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
new file mode 100644
index 0000000..44d75cd
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -0,0 +1,102 @@
+package xyz.driver.core
+package messaging
+
+import akka.NotUsed
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+/** An extension to message buses that offers an Akka-Streams API.
+ *
+ * Example usage of a stream that subscribes to one topic, prints any messages
+ * it receives and finally acknowledges them
+ * their receipt.
+ * {{{
+ * val bus: StreamBus = ???
+ * val topic = Topic.string("topic")
+ * bus.subscribe(topic1)
+ * .map{ msg =>
+ * print(msg.data)
+ * msg
+ * }
+ * .to(bus.acknowledge)
+ * .run()
+ * }}} */
+trait StreamBus extends Bus {
+
+ /** Flow that publishes any messages to a given topic.
+ * Emits messages once they have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
+ Flow[A]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
+ .mapAsync(1) { a =>
+ publishMessages(topic, a).map(_ => a)
+ }
+ .mapConcat(list => list.toList)
+ }
+
+ /** Sink that acknowledges the receipt of a message. */
+ def acknowledge: Sink[MessageId, NotUsed] = {
+ Flow[MessageId]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
+ .mapAsync(1)(acknowledgeMessages(_))
+ .to(Sink.ignore)
+ }
+
+ /** Source that listens to a subscription and receives any messages sent to its topic. */
+ def subscribe[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
+ Source
+ .unfoldAsync((topic, config))(
+ topicAndConfig =>
+ fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
+ Some(topicAndConfig -> msgs))
+ )
+ .filter(_.nonEmpty)
+ .mapConcat(messages => messages.toList)
+ }
+
+ def runWithRestart[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
+ RestartSource
+ .withBackoff[MessageId](
+ minBackoff,
+ maxBackoff,
+ randomFactor,
+ maxRestarts
+ ) { () =>
+ subscribe(topic, config)
+ .via(processMessage)
+ .log(topic.name)
+ .mapConcat(identity)
+ }
+ .to(acknowledge)
+ .run()
+ }
+
+ def handleMessage[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ parallelism: Int = 1,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
+ runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
+ Flow[Message[A]].mapAsync(parallelism) { message =>
+ processMessage(message.data).map(_ => message.id :: Nil)
+ }
+ }
+ }
+}