diff options
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/QueueBus.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/QueueBus.scala | 126 |
1 files changed, 0 insertions, 126 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala deleted file mode 100644 index 45c9ed5..0000000 --- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala +++ /dev/null @@ -1,126 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -import akka.actor.{Actor, ActorSystem, Props} - -import scala.collection.mutable -import scala.collection.mutable.Map -import scala.concurrent.duration._ -import scala.concurrent.{Future, Promise} - -/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system - * and is intended for testing purposes only. */ -class QueueBus(implicit system: ActorSystem) extends Bus { - import system.dispatcher - - override type SubscriptionConfig = Long - override val defaultSubscriptionConfig: Long = 0 - override val executionContext = system.dispatcher - - override type MessageId = (String, SubscriptionConfig, Long) - type Message[A] = BasicMessage[A] - - private object ActorMessages { - case class Send[A](topic: Topic[A], messages: Seq[A]) - case class Ack(messages: Seq[MessageId]) - case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) - case object Unack - } - - class Subscription { - val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - } - - private class BusMaster extends Actor { - var _id = 0L - def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { - _id += 1; (topic, cfg, _id) - } - - val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty - - def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { - topics.get(topic) match { - case Some(t) => - t.getOrElseUpdate(cfg, new Subscription) - case None => - topics += topic -> mutable.Map.empty - ensureSubscription(topic, cfg) - } - } - - override def preStart(): Unit = { - context.system.scheduler.schedule(1.seconds, 1.seconds) { - self ! ActorMessages.Unack - } - } - - override def receive: Receive = { - case ActorMessages.Send(topic, messages) => - val buffers = messages.map(topic.serialize) - val subscriptions = topics.getOrElse(topic.name, Map.empty) - for ((cfg, subscription) <- subscriptions) { - for (buffer <- buffers) { - subscription.mailbox += nextId(topic.name, cfg) -> buffer - } - } - - case ActorMessages.Fetch(topic, cfg, max, promise) => - ensureSubscription(topic.name, cfg) - val subscription = topics(topic.name)(cfg) - val messages = subscription.mailbox.take(max) - subscription.unacked ++= messages - subscription.mailbox --= messages.map(_._1) - promise.success(messages.toSeq.map { - case (ackId, buffer) => - new Message[Any] { - val id = ackId - val data = topic.deserialize(buffer) - } - }) - - case ActorMessages.Ack(messageIds) => - for (id @ (topic, cfg, _) <- messageIds) { - ensureSubscription(topic, cfg) - val subscription = topics(topic)(cfg) - subscription.unacked -= id - } - - case ActorMessages.Unack => - for ((_, subscriptions) <- topics) { - for ((_, subscription) <- subscriptions) { - subscription.mailbox ++= subscription.unacked - subscription.unacked.clear() - } - } - } - - } - - val actor = system.actorOf(Props(new BusMaster)) - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { - actor ! ActorMessages.Send(topic, messages) - } - - override def fetchMessages[A]( - topic: messaging.Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - val result = Promise[Seq[Message[A]]] - actor ! ActorMessages.Fetch(topic, config, maxMessages, result) - result.future - } - - override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { - actor ! ActorMessages.Ack(ids) - } - -} - -object QueueBus { - def apply(implicit system: ActorSystem) = new QueueBus -} |