aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
blob: a9ba3a7f1fa0380a779f0384c40eeed365cc2fd7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package xyz.driver.core
package messaging

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration._

/** An extension to message buses that offers an Akka-Streams API.
  *
  * Example usage of a stream that subscribes to one topic, prints any messages
  * it receives and finally acknowledges them
  * their receipt.
  * {{{
  *   val bus: StreamBus = ???
  *   val topic = Topic.string("topic")
  *   bus.subscribe(topic1)
  *     .map{ msg =>
  *       print(msg.data)
  *       msg
  *     }
  *     .to(bus.acknowledge)
  *     .run()
  * }}} */
trait StreamBus extends Bus {

  /** Flow that publishes any messages to a given topic.
    * Emits messages once they have been published to the underlying bus. */
  def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
    Flow[A]
      .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
      .mapAsync(1) { a =>
        publishMessages(topic, a).map(_ => a)
      }
      .mapConcat(list => list.toList)
  }

  /** Sink that acknowledges the receipt of a message. */
  def acknowledge: Sink[MessageId, NotUsed] = {
    Flow[MessageId]
      .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
      .mapAsync(1)(acknowledgeMessages(_))
      .to(Sink.ignore)
  }

  /** Source that listens to a subscription and receives any messages sent to its topic. */
  def subscribe[A](
      topic: Topic[A],
      config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
    Source
      .unfoldAsync((topic, config))(
        topicAndConfig =>
          fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
            Some(topicAndConfig -> msgs))
      )
      .filter(_.nonEmpty)
      .mapConcat(messages => messages.toList)
  }

  def runWithRestart[A](
      topic: Topic[A],
      config: SubscriptionConfig = defaultSubscriptionConfig,
      minBackoff: FiniteDuration = 3.seconds,
      maxBackoff: FiniteDuration = 30.seconds,
      randomFactor: Double = 0.2,
      maxRestarts: Int = 20
  )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
    RestartSource
      .withBackoff[MessageId](
        minBackoff,
        maxBackoff,
        randomFactor,
        maxRestarts
      ) { () =>
        subscribe(topic, config)
          .via(processMessage.recover({ case _ => Nil }))
          .log(topic.name)
          .mapConcat(identity)
      }
      .to(acknowledge)
      .run()
  }

  def handleMessage[A](
      topic: Topic[A],
      config: SubscriptionConfig = defaultSubscriptionConfig,
      parallelism: Int = 1,
      minBackoff: FiniteDuration = 3.seconds,
      maxBackoff: FiniteDuration = 30.seconds,
      randomFactor: Double = 0.2,
      maxRestarts: Int = 20
  )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
    runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
      Flow[Message[A]].mapAsync(parallelism) { message =>
        processMessage(message.data).map(_ => message.id :: Nil)
      }
    }
  }
}