aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
blob: 1af5308c855fb45567362a9efc2137fc3302c4e4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package xyz.driver.core
package messaging

import java.util.concurrent.ConcurrentHashMap

import scala.async.Async.{async, await}
import scala.concurrent.Future

/** Utility mixin that will ensure that topics and subscriptions exist before
  * attempting to read or write from or to them.
  */
trait CreateOnDemand extends Bus {

  /** Create the given topic. This operation is idempotent and is expected to succeed if the topic
    * already exists.
    */
  def createTopic(topic: Topic[_]): Future[Unit]

  /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription
    * already exists.
    */
  def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]

  private val createdTopics        = new ConcurrentHashMap[Topic[_], Future[Unit]]
  private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]]

  private def ensureTopic(topic: Topic[_]) =
    createdTopics.computeIfAbsent(topic, t => createTopic(t))

  private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) =
    createdSubscriptions.computeIfAbsent(topic -> config, {
      case (t, c) => createSubscription(t, c)
    })

  abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
    await(ensureTopic(topic))
    await(super.publishMessages(topic, messages))
  }

  abstract override def fetchMessages[A](
      topic: Topic[A],
      config: SubscriptionConfig,
      maxMessages: Int): Future[Seq[Message[A]]] = async {
    await(ensureTopic(topic))
    await(ensureSubscription(topic, config))
    await(super.fetchMessages(topic, config, maxMessages))
  }

}