blob: bd8e4229f84340d2d84eed80836556c80016e820 (
plain) (
tree)
|
|
package xyz.driver.core
package messaging
import akka.stream.scaladsl.{Flow, Source}
import scala.async.Async.{async, await}
import scala.concurrent.Future
/** Utility mixin that will ensure that topics and subscriptions exist before attempting to stream from or to them. */
trait CreateBeforeStream extends StreamBus {
def createTopic(topic: Topic[_]): Future[Unit]
def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
override def publish[A](topic: Topic[A]): Flow[A, A, _] = {
def create(): Future[Flow[A, A, _]] = async {
await(createTopic(topic))
super.publish(topic)
}
Flow.lazyInitAsync[A, A, Any](() => create())
}
override def subscribe[A](topic: Topic[A], config: SubscriptionConfig): Source[Message[A], _] = {
def create(): Future[Source[Message[A], _]] = async {
await(createTopic(topic))
await(createSubscription(topic, config))
super.subscribe(topic, config)
}
Source.fromFutureSource[Message[A], Any](create())
}
}
|