aboutsummaryrefslogtreecommitdiff
path: root/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-12 16:18:26 -0700
committerJakob Odersky <jakob@odersky.com>2018-10-09 16:19:39 -0700
commit7c755c77afbd67ae2ded9d8b004736d4e27e208f (patch)
treee93f4590165a338ed284adeb6f4a6bd43bb16b6a /core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
parent76db2360364a35be31414a12cbc419a534a51744 (diff)
downloaddriver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.gz
driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.bz2
driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.zip
Move storage and messaging to separate projects
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala')
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala49
1 files changed, 49 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
new file mode 100644
index 0000000..1af5308
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
@@ -0,0 +1,49 @@
+package xyz.driver.core
+package messaging
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.async.Async.{async, await}
+import scala.concurrent.Future
+
+/** Utility mixin that will ensure that topics and subscriptions exist before
+ * attempting to read or write from or to them.
+ */
+trait CreateOnDemand extends Bus {
+
+ /** Create the given topic. This operation is idempotent and is expected to succeed if the topic
+ * already exists.
+ */
+ def createTopic(topic: Topic[_]): Future[Unit]
+
+ /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription
+ * already exists.
+ */
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
+
+ private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]]
+ private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]]
+
+ private def ensureTopic(topic: Topic[_]) =
+ createdTopics.computeIfAbsent(topic, t => createTopic(t))
+
+ private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) =
+ createdSubscriptions.computeIfAbsent(topic -> config, {
+ case (t, c) => createSubscription(t, c)
+ })
+
+ abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
+ await(ensureTopic(topic))
+ await(super.publishMessages(topic, messages))
+ }
+
+ abstract override def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = async {
+ await(ensureTopic(topic))
+ await(ensureSubscription(topic, config))
+ await(super.fetchMessages(topic, config, maxMessages))
+ }
+
+}