From f07bee564b11ee76fd065ec849a888bcf4e74e85 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 10 Sep 2018 15:10:09 -0700 Subject: Various message bus fixes (#212) 1. Move to pure mixin-based ("stackable traits") pattern. 2. Provide a "CreateOnDemand" mixin that ensures topics and subscriptions have been created before they are used. --- .../xyz/driver/core/messaging/AliyunBus.scala | 6 +-- src/main/scala/xyz/driver/core/messaging/Bus.scala | 5 ++- .../driver/core/messaging/CreateBeforeStream.scala | 32 -------------- .../xyz/driver/core/messaging/CreateOnDemand.scala | 49 ++++++++++++++++++++++ .../xyz/driver/core/messaging/GoogleBus.scala | 6 +-- .../scala/xyz/driver/core/messaging/QueueBus.scala | 1 + .../xyz/driver/core/messaging/StreamBus.scala | 13 +++--- 7 files changed, 67 insertions(+), 45 deletions(-) delete mode 100644 src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala index 27e66f6..92e47bd 100644 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -17,7 +17,7 @@ class AliyunBus( region: String, namespace: String, pullTimeout: Int)(implicit val executionContext: ExecutionContext) - extends Bus with StreamBus with CreateBeforeStream { + extends Bus { val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) val client = cloudAccount.getMNSClient @@ -115,7 +115,7 @@ class AliyunBus( Future.sequence(publishMessages).map(_ => ()) } - override def createTopic(topic: Topic[_]): Future[Unit] = Future { + def createTopic(topic: Topic[_]): Future[Unit] = Future { val topicName = rawTopicName(topic) val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) if (!topicExists) { @@ -125,7 +125,7 @@ class AliyunBus( } } - override def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { val subscriptionName = rawSubscriptionName(config, topic) val topicName = rawTopicName(topic) val topicRef = client.getTopicRef(topicName) diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala index 599af92..75954f4 100644 --- a/src/main/scala/xyz/driver/core/messaging/Bus.scala +++ b/src/main/scala/xyz/driver/core/messaging/Bus.scala @@ -16,7 +16,7 @@ trait Bus { /** Most general kind of message. Any implementation of a message bus must provide * the fields and methods specified in this trait. */ - trait BasicMessage[A] { + trait BasicMessage[A] { self: Message[A] => /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ def id: MessageId @@ -41,6 +41,9 @@ trait Bus { /** Maximum amount of messages handled in a single retrieval call. */ val defaultMaxMessages = 64 + /** Execution context that is used to query and dispatch messages from this bus. */ + implicit val executionContext: ExecutionContext + /** Retrieve any new messages in the mailbox of a subscription. * * Any retrieved messages become "outstanding" and should not be returned by this function diff --git a/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala b/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala deleted file mode 100644 index bd8e422..0000000 --- a/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala +++ /dev/null @@ -1,32 +0,0 @@ -package xyz.driver.core -package messaging - -import akka.stream.scaladsl.{Flow, Source} - -import scala.async.Async.{async, await} -import scala.concurrent.Future - -/** Utility mixin that will ensure that topics and subscriptions exist before attempting to stream from or to them. */ -trait CreateBeforeStream extends StreamBus { - - def createTopic(topic: Topic[_]): Future[Unit] - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] - - override def publish[A](topic: Topic[A]): Flow[A, A, _] = { - def create(): Future[Flow[A, A, _]] = async { - await(createTopic(topic)) - super.publish(topic) - } - Flow.lazyInitAsync[A, A, Any](() => create()) - } - - override def subscribe[A](topic: Topic[A], config: SubscriptionConfig): Source[Message[A], _] = { - def create(): Future[Source[Message[A], _]] = async { - await(createTopic(topic)) - await(createSubscription(topic, config)) - super.subscribe(topic, config) - } - Source.fromFutureSource[Message[A], Any](create()) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala new file mode 100644 index 0000000..1af5308 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala @@ -0,0 +1,49 @@ +package xyz.driver.core +package messaging + +import java.util.concurrent.ConcurrentHashMap + +import scala.async.Async.{async, await} +import scala.concurrent.Future + +/** Utility mixin that will ensure that topics and subscriptions exist before + * attempting to read or write from or to them. + */ +trait CreateOnDemand extends Bus { + + /** Create the given topic. This operation is idempotent and is expected to succeed if the topic + * already exists. + */ + def createTopic(topic: Topic[_]): Future[Unit] + + /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription + * already exists. + */ + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] + + private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]] + private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]] + + private def ensureTopic(topic: Topic[_]) = + createdTopics.computeIfAbsent(topic, t => createTopic(t)) + + private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) = + createdSubscriptions.computeIfAbsent(topic -> config, { + case (t, c) => createSubscription(t, c) + }) + + abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { + await(ensureTopic(topic)) + await(super.publishMessages(topic, messages)) + } + + abstract override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = async { + await(ensureTopic(topic)) + await(ensureSubscription(topic, config)) + await(super.fetchMessages(topic, config, maxMessages)) + } + +} diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala index cf60e59..9895708 100644 --- a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala @@ -58,7 +58,7 @@ class GoogleBus( namespace: String, pullTimeout: Duration = 90.seconds )(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) - extends Bus with StreamBus with CreateBeforeStream { + extends Bus { import GoogleBus.Protocol case class MessageId(subscription: String, ackId: String) @@ -133,7 +133,7 @@ class GoogleBus( .bearer(await(getToken())) val result = await(request.send()) result.body match { - case Left(error) if result.code != 409 => // 409 <=> topic already exists + case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error") case _ => () } @@ -152,7 +152,7 @@ class GoogleBus( ) val result = await(request.send()) result.body match { - case Left(error) if result.code != 409 => // 409 <=> subscription already exists + case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error") case _ => () } diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala index d2ad073..45c9ed5 100644 --- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala @@ -17,6 +17,7 @@ class QueueBus(implicit system: ActorSystem) extends Bus { override type SubscriptionConfig = Long override val defaultSubscriptionConfig: Long = 0 + override val executionContext = system.dispatcher override type MessageId = (String, SubscriptionConfig, Long) type Message[A] = BasicMessage[A] diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala index aa99960..aabd3dc 100644 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -1,10 +1,10 @@ package xyz.driver.core package messaging +import akka.NotUsed import akka.stream.scaladsl.{Flow, Sink, Source} import scala.collection.mutable.ListBuffer -import scala.concurrent.ExecutionContext /** An extension to message buses that offers an Akka-Streams API. * @@ -23,11 +23,10 @@ import scala.concurrent.ExecutionContext * .run() * }}} */ trait StreamBus extends Bus { - implicit def executionContext: ExecutionContext /** Flow that publishes any messages to a given topic. - * Emits messages once have been published to the underlying bus. */ - def publish[A](topic: Topic[A]): Flow[A, A, _] = { + * Emits messages once they have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { Flow[A] .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) .mapAsync(1) { a => @@ -37,7 +36,7 @@ trait StreamBus extends Bus { } /** Sink that acknowledges the receipt of a message. */ - def acknowledge: Sink[MessageId, _] = { + def acknowledge: Sink[MessageId, NotUsed] = { Flow[MessageId] .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) .mapAsync(1)(acknowledgeMessages(_)) @@ -45,7 +44,9 @@ trait StreamBus extends Bus { } /** Source that listens to a subscription and receives any messages sent to its topic. */ - def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = { + def subscribe[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { Source .unfoldAsync((topic, config))( topicAndConfig => -- cgit v1.2.3