aboutsummaryrefslogtreecommitdiff
path: root/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala')
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala126
1 files changed, 126 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
new file mode 100644
index 0000000..45c9ed5
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
@@ -0,0 +1,126 @@
+package xyz.driver.core
+package messaging
+
+import java.nio.ByteBuffer
+
+import akka.actor.{Actor, ActorSystem, Props}
+
+import scala.collection.mutable
+import scala.collection.mutable.Map
+import scala.concurrent.duration._
+import scala.concurrent.{Future, Promise}
+
+/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system
+ * and is intended for testing purposes only. */
+class QueueBus(implicit system: ActorSystem) extends Bus {
+ import system.dispatcher
+
+ override type SubscriptionConfig = Long
+ override val defaultSubscriptionConfig: Long = 0
+ override val executionContext = system.dispatcher
+
+ override type MessageId = (String, SubscriptionConfig, Long)
+ type Message[A] = BasicMessage[A]
+
+ private object ActorMessages {
+ case class Send[A](topic: Topic[A], messages: Seq[A])
+ case class Ack(messages: Seq[MessageId])
+ case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]])
+ case object Unack
+ }
+
+ class Subscription {
+ val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
+ val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
+ }
+
+ private class BusMaster extends Actor {
+ var _id = 0L
+ def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = {
+ _id += 1; (topic, cfg, _id)
+ }
+
+ val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty
+
+ def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = {
+ topics.get(topic) match {
+ case Some(t) =>
+ t.getOrElseUpdate(cfg, new Subscription)
+ case None =>
+ topics += topic -> mutable.Map.empty
+ ensureSubscription(topic, cfg)
+ }
+ }
+
+ override def preStart(): Unit = {
+ context.system.scheduler.schedule(1.seconds, 1.seconds) {
+ self ! ActorMessages.Unack
+ }
+ }
+
+ override def receive: Receive = {
+ case ActorMessages.Send(topic, messages) =>
+ val buffers = messages.map(topic.serialize)
+ val subscriptions = topics.getOrElse(topic.name, Map.empty)
+ for ((cfg, subscription) <- subscriptions) {
+ for (buffer <- buffers) {
+ subscription.mailbox += nextId(topic.name, cfg) -> buffer
+ }
+ }
+
+ case ActorMessages.Fetch(topic, cfg, max, promise) =>
+ ensureSubscription(topic.name, cfg)
+ val subscription = topics(topic.name)(cfg)
+ val messages = subscription.mailbox.take(max)
+ subscription.unacked ++= messages
+ subscription.mailbox --= messages.map(_._1)
+ promise.success(messages.toSeq.map {
+ case (ackId, buffer) =>
+ new Message[Any] {
+ val id = ackId
+ val data = topic.deserialize(buffer)
+ }
+ })
+
+ case ActorMessages.Ack(messageIds) =>
+ for (id @ (topic, cfg, _) <- messageIds) {
+ ensureSubscription(topic, cfg)
+ val subscription = topics(topic)(cfg)
+ subscription.unacked -= id
+ }
+
+ case ActorMessages.Unack =>
+ for ((_, subscriptions) <- topics) {
+ for ((_, subscription) <- subscriptions) {
+ subscription.mailbox ++= subscription.unacked
+ subscription.unacked.clear()
+ }
+ }
+ }
+
+ }
+
+ val actor = system.actorOf(Props(new BusMaster))
+
+ override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future {
+ actor ! ActorMessages.Send(topic, messages)
+ }
+
+ override def fetchMessages[A](
+ topic: messaging.Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = {
+ val result = Promise[Seq[Message[A]]]
+ actor ! ActorMessages.Fetch(topic, config, maxMessages, result)
+ result.future
+ }
+
+ override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future {
+ actor ! ActorMessages.Ack(ids)
+ }
+
+}
+
+object QueueBus {
+ def apply(implicit system: ActorSystem) = new QueueBus
+}