aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
blob: d2ad073c0363971f7b0f78fe89b8090f5e85ca50 (plain) (tree)




























































































































                                                                                                               
package xyz.driver.core
package messaging

import java.nio.ByteBuffer

import akka.actor.{Actor, ActorSystem, Props}

import scala.collection.mutable
import scala.collection.mutable.Map
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}

/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system
  * and is intended for testing purposes only. */
class QueueBus(implicit system: ActorSystem) extends Bus {
  import system.dispatcher

  override type SubscriptionConfig = Long
  override val defaultSubscriptionConfig: Long = 0

  override type MessageId = (String, SubscriptionConfig, Long)
  type Message[A]         = BasicMessage[A]

  private object ActorMessages {
    case class Send[A](topic: Topic[A], messages: Seq[A])
    case class Ack(messages: Seq[MessageId])
    case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]])
    case object Unack
  }

  class Subscription {
    val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
    val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
  }

  private class BusMaster extends Actor {
    var _id = 0L
    def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = {
      _id += 1; (topic, cfg, _id)
    }

    val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty

    def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = {
      topics.get(topic) match {
        case Some(t) =>
          t.getOrElseUpdate(cfg, new Subscription)
        case None =>
          topics += topic -> mutable.Map.empty
          ensureSubscription(topic, cfg)
      }
    }

    override def preStart(): Unit = {
      context.system.scheduler.schedule(1.seconds, 1.seconds) {
        self ! ActorMessages.Unack
      }
    }

    override def receive: Receive = {
      case ActorMessages.Send(topic, messages) =>
        val buffers       = messages.map(topic.serialize)
        val subscriptions = topics.getOrElse(topic.name, Map.empty)
        for ((cfg, subscription) <- subscriptions) {
          for (buffer <- buffers) {
            subscription.mailbox += nextId(topic.name, cfg) -> buffer
          }
        }

      case ActorMessages.Fetch(topic, cfg, max, promise) =>
        ensureSubscription(topic.name, cfg)
        val subscription = topics(topic.name)(cfg)
        val messages     = subscription.mailbox.take(max)
        subscription.unacked ++= messages
        subscription.mailbox --= messages.map(_._1)
        promise.success(messages.toSeq.map {
          case (ackId, buffer) =>
            new Message[Any] {
              val id   = ackId
              val data = topic.deserialize(buffer)
            }
        })

      case ActorMessages.Ack(messageIds) =>
        for (id @ (topic, cfg, _) <- messageIds) {
          ensureSubscription(topic, cfg)
          val subscription = topics(topic)(cfg)
          subscription.unacked -= id
        }

      case ActorMessages.Unack =>
        for ((_, subscriptions) <- topics) {
          for ((_, subscription) <- subscriptions) {
            subscription.mailbox ++= subscription.unacked
            subscription.unacked.clear()
          }
        }
    }

  }

  val actor = system.actorOf(Props(new BusMaster))

  override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future {
    actor ! ActorMessages.Send(topic, messages)
  }

  override def fetchMessages[A](
      topic: messaging.Topic[A],
      config: SubscriptionConfig,
      maxMessages: Int): Future[Seq[Message[A]]] = {
    val result = Promise[Seq[Message[A]]]
    actor ! ActorMessages.Fetch(topic, config, maxMessages, result)
    result.future
  }

  override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future {
    actor ! ActorMessages.Ack(ids)
  }

}

object QueueBus {
  def apply(implicit system: ActorSystem) = new QueueBus
}