aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2018-09-10 15:10:09 -0700
committerGitHub <noreply@github.com>2018-09-10 15:10:09 -0700
commitf07bee564b11ee76fd065ec849a888bcf4e74e85 (patch)
tree538330c7592ccd4f555a07bf03aa91cace403be5
parent93ffc0a5c4b54a8beea10c3fd68e7a2d70c4c771 (diff)
downloaddriver-core-f07bee564b11ee76fd065ec849a888bcf4e74e85.tar.gz
driver-core-f07bee564b11ee76fd065ec849a888bcf4e74e85.tar.bz2
driver-core-f07bee564b11ee76fd065ec849a888bcf4e74e85.zip
Various message bus fixes (#212)v1.14.0
1. Move to pure mixin-based ("stackable traits") pattern. 2. Provide a "CreateOnDemand" mixin that ensures topics and subscriptions have been created before they are used.
-rw-r--r--src/main/scala/xyz/driver/core/messaging/AliyunBus.scala6
-rw-r--r--src/main/scala/xyz/driver/core/messaging/Bus.scala5
-rw-r--r--src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala32
-rw-r--r--src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala49
-rw-r--r--src/main/scala/xyz/driver/core/messaging/GoogleBus.scala6
-rw-r--r--src/main/scala/xyz/driver/core/messaging/QueueBus.scala1
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala13
7 files changed, 67 insertions, 45 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
index 27e66f6..92e47bd 100644
--- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -17,7 +17,7 @@ class AliyunBus(
region: String,
namespace: String,
pullTimeout: Int)(implicit val executionContext: ExecutionContext)
- extends Bus with StreamBus with CreateBeforeStream {
+ extends Bus {
val endpoint = s"https://$accountId.mns.$region.aliyuncs.com"
val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
val client = cloudAccount.getMNSClient
@@ -115,7 +115,7 @@ class AliyunBus(
Future.sequence(publishMessages).map(_ => ())
}
- override def createTopic(topic: Topic[_]): Future[Unit] = Future {
+ def createTopic(topic: Topic[_]): Future[Unit] = Future {
val topicName = rawTopicName(topic)
val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
if (!topicExists) {
@@ -125,7 +125,7 @@ class AliyunBus(
}
}
- override def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
val subscriptionName = rawSubscriptionName(config, topic)
val topicName = rawTopicName(topic)
val topicRef = client.getTopicRef(topicName)
diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala
index 599af92..75954f4 100644
--- a/src/main/scala/xyz/driver/core/messaging/Bus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/Bus.scala
@@ -16,7 +16,7 @@ trait Bus {
/** Most general kind of message. Any implementation of a message bus must provide
* the fields and methods specified in this trait. */
- trait BasicMessage[A] {
+ trait BasicMessage[A] { self: Message[A] =>
/** All messages must have unique IDs so that they can be acknowledged unambiguously. */
def id: MessageId
@@ -41,6 +41,9 @@ trait Bus {
/** Maximum amount of messages handled in a single retrieval call. */
val defaultMaxMessages = 64
+ /** Execution context that is used to query and dispatch messages from this bus. */
+ implicit val executionContext: ExecutionContext
+
/** Retrieve any new messages in the mailbox of a subscription.
*
* Any retrieved messages become "outstanding" and should not be returned by this function
diff --git a/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala b/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala
deleted file mode 100644
index bd8e422..0000000
--- a/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import akka.stream.scaladsl.{Flow, Source}
-
-import scala.async.Async.{async, await}
-import scala.concurrent.Future
-
-/** Utility mixin that will ensure that topics and subscriptions exist before attempting to stream from or to them. */
-trait CreateBeforeStream extends StreamBus {
-
- def createTopic(topic: Topic[_]): Future[Unit]
- def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
-
- override def publish[A](topic: Topic[A]): Flow[A, A, _] = {
- def create(): Future[Flow[A, A, _]] = async {
- await(createTopic(topic))
- super.publish(topic)
- }
- Flow.lazyInitAsync[A, A, Any](() => create())
- }
-
- override def subscribe[A](topic: Topic[A], config: SubscriptionConfig): Source[Message[A], _] = {
- def create(): Future[Source[Message[A], _]] = async {
- await(createTopic(topic))
- await(createSubscription(topic, config))
- super.subscribe(topic, config)
- }
- Source.fromFutureSource[Message[A], Any](create())
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
new file mode 100644
index 0000000..1af5308
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
@@ -0,0 +1,49 @@
+package xyz.driver.core
+package messaging
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.async.Async.{async, await}
+import scala.concurrent.Future
+
+/** Utility mixin that will ensure that topics and subscriptions exist before
+ * attempting to read or write from or to them.
+ */
+trait CreateOnDemand extends Bus {
+
+ /** Create the given topic. This operation is idempotent and is expected to succeed if the topic
+ * already exists.
+ */
+ def createTopic(topic: Topic[_]): Future[Unit]
+
+ /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription
+ * already exists.
+ */
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
+
+ private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]]
+ private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]]
+
+ private def ensureTopic(topic: Topic[_]) =
+ createdTopics.computeIfAbsent(topic, t => createTopic(t))
+
+ private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) =
+ createdSubscriptions.computeIfAbsent(topic -> config, {
+ case (t, c) => createSubscription(t, c)
+ })
+
+ abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
+ await(ensureTopic(topic))
+ await(super.publishMessages(topic, messages))
+ }
+
+ abstract override def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = async {
+ await(ensureTopic(topic))
+ await(ensureSubscription(topic, config))
+ await(super.fetchMessages(topic, config, maxMessages))
+ }
+
+}
diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
index cf60e59..9895708 100644
--- a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
@@ -58,7 +58,7 @@ class GoogleBus(
namespace: String,
pullTimeout: Duration = 90.seconds
)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _])
- extends Bus with StreamBus with CreateBeforeStream {
+ extends Bus {
import GoogleBus.Protocol
case class MessageId(subscription: String, ackId: String)
@@ -133,7 +133,7 @@ class GoogleBus(
.bearer(await(getToken()))
val result = await(request.send())
result.body match {
- case Left(error) if result.code != 409 => // 409 <=> topic already exists
+ case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it
throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error")
case _ => ()
}
@@ -152,7 +152,7 @@ class GoogleBus(
)
val result = await(request.send())
result.body match {
- case Left(error) if result.code != 409 => // 409 <=> subscription already exists
+ case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it
throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error")
case _ => ()
}
diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
index d2ad073..45c9ed5 100644
--- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
@@ -17,6 +17,7 @@ class QueueBus(implicit system: ActorSystem) extends Bus {
override type SubscriptionConfig = Long
override val defaultSubscriptionConfig: Long = 0
+ override val executionContext = system.dispatcher
override type MessageId = (String, SubscriptionConfig, Long)
type Message[A] = BasicMessage[A]
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
index aa99960..aabd3dc 100644
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -1,10 +1,10 @@
package xyz.driver.core
package messaging
+import akka.NotUsed
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.collection.mutable.ListBuffer
-import scala.concurrent.ExecutionContext
/** An extension to message buses that offers an Akka-Streams API.
*
@@ -23,11 +23,10 @@ import scala.concurrent.ExecutionContext
* .run()
* }}} */
trait StreamBus extends Bus {
- implicit def executionContext: ExecutionContext
/** Flow that publishes any messages to a given topic.
- * Emits messages once have been published to the underlying bus. */
- def publish[A](topic: Topic[A]): Flow[A, A, _] = {
+ * Emits messages once they have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
Flow[A]
.batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
.mapAsync(1) { a =>
@@ -37,7 +36,7 @@ trait StreamBus extends Bus {
}
/** Sink that acknowledges the receipt of a message. */
- def acknowledge: Sink[MessageId, _] = {
+ def acknowledge: Sink[MessageId, NotUsed] = {
Flow[MessageId]
.batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
.mapAsync(1)(acknowledgeMessages(_))
@@ -45,7 +44,9 @@ trait StreamBus extends Bus {
}
/** Source that listens to a subscription and receives any messages sent to its topic. */
- def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = {
+ def subscribe[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
Source
.unfoldAsync((topic, config))(
topicAndConfig =>