1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
package xyz.driver.core
package messaging
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
/** An extension to message buses that offers an Akka-Streams API.
*
* Example usage of a stream that subscribes to one topic, prints any messages
* it receives and finally acknowledges them
* their receipt.
* {{{
* val bus: StreamBus = ???
* val topic = Topic.string("topic")
* bus.subscribe(topic1)
* .map{ msg =>
* print(msg.data)
* msg
* }
* .to(bus.acknowledge)
* .run()
* }}} */
trait StreamBus extends Bus {
implicit def executionContext: ExecutionContext
/** Flow that publishes any messages to a given topic.
* Emits messages once have been published to the underlying bus. */
def publish[A](topic: Topic[A]): Flow[A, A, _] = {
Flow[A]
.batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
.mapAsync(1) { a =>
publishMessages(topic, a).map(_ => a)
}
.mapConcat(list => list.toList)
}
/** Sink that acknowledges the receipt of a message. */
def acknowledge: Sink[MessageId, _] = {
Flow[MessageId]
.batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
.mapAsync(1)(acknowledgeMessages(_))
.to(Sink.ignore)
}
/** Source that listens to a subscription and receives any messages sent to its topic. */
def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = {
Source
.unfoldAsync((topic, config))(
topicAndConfig =>
fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
Some(topicAndConfig -> msgs))
)
.filter(_.nonEmpty)
.mapConcat(messages => messages.toList)
}
}
|