package xyz.driver.core package messaging import java.nio.ByteBuffer import akka.actor.{Actor, ActorSystem, Props} import scala.collection.mutable import scala.collection.mutable.Map import scala.concurrent.duration._ import scala.concurrent.{Future, Promise} /** A bus backed by an asynchronous queue. Note that this bus requires a local actor system * and is intended for testing purposes only. */ class QueueBus(implicit system: ActorSystem) extends Bus { import system.dispatcher override type SubscriptionConfig = Long override val defaultSubscriptionConfig: Long = 0 override type MessageId = (String, SubscriptionConfig, Long) type Message[A] = BasicMessage[A] private object ActorMessages { case class Send[A](topic: Topic[A], messages: Seq[A]) case class Ack(messages: Seq[MessageId]) case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) case object Unack } class Subscription { val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty } private class BusMaster extends Actor { var _id = 0L def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { _id += 1; (topic, cfg, _id) } val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { topics.get(topic) match { case Some(t) => t.getOrElseUpdate(cfg, new Subscription) case None => topics += topic -> mutable.Map.empty ensureSubscription(topic, cfg) } } override def preStart(): Unit = { context.system.scheduler.schedule(1.seconds, 1.seconds) { self ! ActorMessages.Unack } } override def receive: Receive = { case ActorMessages.Send(topic, messages) => val buffers = messages.map(topic.serialize) val subscriptions = topics.getOrElse(topic.name, Map.empty) for ((cfg, subscription) <- subscriptions) { for (buffer <- buffers) { subscription.mailbox += nextId(topic.name, cfg) -> buffer } } case ActorMessages.Fetch(topic, cfg, max, promise) => ensureSubscription(topic.name, cfg) val subscription = topics(topic.name)(cfg) val messages = subscription.mailbox.take(max) subscription.unacked ++= messages subscription.mailbox --= messages.map(_._1) promise.success(messages.toSeq.map { case (ackId, buffer) => new Message[Any] { val id = ackId val data = topic.deserialize(buffer) } }) case ActorMessages.Ack(messageIds) => for (id @ (topic, cfg, _) <- messageIds) { ensureSubscription(topic, cfg) val subscription = topics(topic)(cfg) subscription.unacked -= id } case ActorMessages.Unack => for ((_, subscriptions) <- topics) { for ((_, subscription) <- subscriptions) { subscription.mailbox ++= subscription.unacked subscription.unacked.clear() } } } } val actor = system.actorOf(Props(new BusMaster)) override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { actor ! ActorMessages.Send(topic, messages) } override def fetchMessages[A]( topic: messaging.Topic[A], config: SubscriptionConfig, maxMessages: Int): Future[Seq[Message[A]]] = { val result = Promise[Seq[Message[A]]] actor ! ActorMessages.Fetch(topic, config, maxMessages, result) result.future } override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { actor ! ActorMessages.Ack(ids) } } object QueueBus { def apply(implicit system: ActorSystem) = new QueueBus }