1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package xyz.driver.core
package messaging
import akka.stream.scaladsl.{Flow, Source}
import scala.async.Async.{async, await}
import scala.concurrent.Future
/** Utility mixin that will ensure that topics and subscriptions exist before attempting to stream from or to them. */
trait CreateBeforeStream extends StreamBus {
def createTopic(topic: Topic[_]): Future[Unit]
def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
override def publish[A](topic: Topic[A]): Flow[A, A, _] = {
def create(): Future[Flow[A, A, _]] = async {
await(createTopic(topic))
super.publish(topic)
}
Flow.lazyInitAsync[A, A, Any](() => create())
}
override def subscribe[A](topic: Topic[A], config: SubscriptionConfig): Source[Message[A], _] = {
def create(): Future[Source[Message[A], _]] = async {
await(createTopic(topic))
await(createSubscription(topic, config))
super.subscribe(topic, config)
}
Source.fromFutureSource[Message[A], Any](create())
}
}
|