From e6552f3b31b55396c652c196c5c3a9c3a6cfed71 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 31 Jul 2018 12:13:47 -0700 Subject: Add message bus and topic abstractions (#181) --- .../driver/core/messaging/CreateBeforeStream.scala | 32 ++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala (limited to 'src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala') diff --git a/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala b/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala new file mode 100644 index 0000000..bd8e422 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala @@ -0,0 +1,32 @@ +package xyz.driver.core +package messaging + +import akka.stream.scaladsl.{Flow, Source} + +import scala.async.Async.{async, await} +import scala.concurrent.Future + +/** Utility mixin that will ensure that topics and subscriptions exist before attempting to stream from or to them. */ +trait CreateBeforeStream extends StreamBus { + + def createTopic(topic: Topic[_]): Future[Unit] + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] + + override def publish[A](topic: Topic[A]): Flow[A, A, _] = { + def create(): Future[Flow[A, A, _]] = async { + await(createTopic(topic)) + super.publish(topic) + } + Flow.lazyInitAsync[A, A, Any](() => create()) + } + + override def subscribe[A](topic: Topic[A], config: SubscriptionConfig): Source[Message[A], _] = { + def create(): Future[Source[Message[A], _]] = async { + await(createTopic(topic)) + await(createSubscription(topic, config)) + super.subscribe(topic, config) + } + Source.fromFutureSource[Message[A], Any](create()) + } + +} -- cgit v1.2.3