package xyz.driver.core
package messaging
import java.nio.ByteBuffer
import akka.actor.{Actor, ActorSystem, Props}
import scala.collection.mutable
import scala.collection.mutable.Map
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system
* and is intended for testing purposes only. */
class QueueBus(implicit system: ActorSystem) extends Bus {
import system.dispatcher
override type SubscriptionConfig = Long
override val defaultSubscriptionConfig: Long = 0
override type MessageId = (String, SubscriptionConfig, Long)
type Message[A] = BasicMessage[A]
private object ActorMessages {
case class Send[A](topic: Topic[A], messages: Seq[A])
case class Ack(messages: Seq[MessageId])
case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]])
case object Unack
}
class Subscription {
val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
}
private class BusMaster extends Actor {
var _id = 0L
def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = {
_id += 1; (topic, cfg, _id)
}
val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty
def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = {
topics.get(topic) match {
case Some(t) =>
t.getOrElseUpdate(cfg, new Subscription)
case None =>
topics += topic -> mutable.Map.empty
ensureSubscription(topic, cfg)
}
}
override def preStart(): Unit = {
context.system.scheduler.schedule(1.seconds, 1.seconds) {
self ! ActorMessages.Unack
}
}
override def receive: Receive = {
case ActorMessages.Send(topic, messages) =>
val buffers = messages.map(topic.serialize)
val subscriptions = topics.getOrElse(topic.name, Map.empty)
for ((cfg, subscription) <- subscriptions) {
for (buffer <- buffers) {
subscription.mailbox += nextId(topic.name, cfg) -> buffer
}
}
case ActorMessages.Fetch(topic, cfg, max, promise) =>
ensureSubscription(topic.name, cfg)
val subscription = topics(topic.name)(cfg)
val messages = subscription.mailbox.take(max)
subscription.unacked ++= messages
subscription.mailbox --= messages.map(_._1)
promise.success(messages.toSeq.map {
case (ackId, buffer) =>
new Message[Any] {
val id = ackId
val data = topic.deserialize(buffer)
}
})
case ActorMessages.Ack(messageIds) =>
for (id @ (topic, cfg, _) <- messageIds) {
ensureSubscription(topic, cfg)
val subscription = topics(topic)(cfg)
subscription.unacked -= id
}
case ActorMessages.Unack =>
for ((_, subscriptions) <- topics) {
for ((_, subscription) <- subscriptions) {
subscription.mailbox ++= subscription.unacked
subscription.unacked.clear()
}
}
}
}
val actor = system.actorOf(Props(new BusMaster))
override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future {
actor ! ActorMessages.Send(topic, messages)
}
override def fetchMessages[A](
topic: messaging.Topic[A],
config: SubscriptionConfig,
maxMessages: Int): Future[Seq[Message[A]]] = {
val result = Promise[Seq[Message[A]]]
actor ! ActorMessages.Fetch(topic, config, maxMessages, result)
result.future
}
override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future {
actor ! ActorMessages.Ack(ids)
}
}
object QueueBus {
def apply(implicit system: ActorSystem) = new QueueBus
}