aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
blob: 44d75cd92b51fde650afa6a7b0158cafe4a79cc4 (plain) (tree)
1
2
3
4
5
6
7
8
9
10


                       
                   

                                                               

                                          

                                  

















                                                                               

                                                        

                                                                            








                                                                      
                                               






                                                                                           


                                                                                              









                                                                                             















                                                                                                        
                              





















                                                                                      
 
package xyz.driver.core
package messaging

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration._

/** An extension to message buses that offers an Akka-Streams API.
  *
  * Example usage of a stream that subscribes to one topic, prints any messages
  * it receives and finally acknowledges them
  * their receipt.
  * {{{
  *   val bus: StreamBus = ???
  *   val topic = Topic.string("topic")
  *   bus.subscribe(topic1)
  *     .map{ msg =>
  *       print(msg.data)
  *       msg
  *     }
  *     .to(bus.acknowledge)
  *     .run()
  * }}} */
trait StreamBus extends Bus {

  /** Flow that publishes any messages to a given topic.
    * Emits messages once they have been published to the underlying bus. */
  def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
    Flow[A]
      .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
      .mapAsync(1) { a =>
        publishMessages(topic, a).map(_ => a)
      }
      .mapConcat(list => list.toList)
  }

  /** Sink that acknowledges the receipt of a message. */
  def acknowledge: Sink[MessageId, NotUsed] = {
    Flow[MessageId]
      .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
      .mapAsync(1)(acknowledgeMessages(_))
      .to(Sink.ignore)
  }

  /** Source that listens to a subscription and receives any messages sent to its topic. */
  def subscribe[A](
      topic: Topic[A],
      config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
    Source
      .unfoldAsync((topic, config))(
        topicAndConfig =>
          fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
            Some(topicAndConfig -> msgs))
      )
      .filter(_.nonEmpty)
      .mapConcat(messages => messages.toList)
  }

  def runWithRestart[A](
      topic: Topic[A],
      config: SubscriptionConfig = defaultSubscriptionConfig,
      minBackoff: FiniteDuration = 3.seconds,
      maxBackoff: FiniteDuration = 30.seconds,
      randomFactor: Double = 0.2,
      maxRestarts: Int = 20
  )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
    RestartSource
      .withBackoff[MessageId](
        minBackoff,
        maxBackoff,
        randomFactor,
        maxRestarts
      ) { () =>
        subscribe(topic, config)
          .via(processMessage)
          .log(topic.name)
          .mapConcat(identity)
      }
      .to(acknowledge)
      .run()
  }

  def handleMessage[A](
      topic: Topic[A],
      config: SubscriptionConfig = defaultSubscriptionConfig,
      parallelism: Int = 1,
      minBackoff: FiniteDuration = 3.seconds,
      maxBackoff: FiniteDuration = 30.seconds,
      randomFactor: Double = 0.2,
      maxRestarts: Int = 20
  )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
    runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
      Flow[Message[A]].mapAsync(parallelism) { message =>
        processMessage(message.data).map(_ => message.id :: Nil)
      }
    }
  }
}