From 7c755c77afbd67ae2ded9d8b004736d4e27e208f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 16:18:26 -0700 Subject: Move storage and messaging to separate projects --- .../scala/xyz/driver/core/messaging/QueueBus.scala | 126 +++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala') diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala new file mode 100644 index 0000000..45c9ed5 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala @@ -0,0 +1,126 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +import akka.actor.{Actor, ActorSystem, Props} + +import scala.collection.mutable +import scala.collection.mutable.Map +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} + +/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system + * and is intended for testing purposes only. */ +class QueueBus(implicit system: ActorSystem) extends Bus { + import system.dispatcher + + override type SubscriptionConfig = Long + override val defaultSubscriptionConfig: Long = 0 + override val executionContext = system.dispatcher + + override type MessageId = (String, SubscriptionConfig, Long) + type Message[A] = BasicMessage[A] + + private object ActorMessages { + case class Send[A](topic: Topic[A], messages: Seq[A]) + case class Ack(messages: Seq[MessageId]) + case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) + case object Unack + } + + class Subscription { + val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + } + + private class BusMaster extends Actor { + var _id = 0L + def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { + _id += 1; (topic, cfg, _id) + } + + val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty + + def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { + topics.get(topic) match { + case Some(t) => + t.getOrElseUpdate(cfg, new Subscription) + case None => + topics += topic -> mutable.Map.empty + ensureSubscription(topic, cfg) + } + } + + override def preStart(): Unit = { + context.system.scheduler.schedule(1.seconds, 1.seconds) { + self ! ActorMessages.Unack + } + } + + override def receive: Receive = { + case ActorMessages.Send(topic, messages) => + val buffers = messages.map(topic.serialize) + val subscriptions = topics.getOrElse(topic.name, Map.empty) + for ((cfg, subscription) <- subscriptions) { + for (buffer <- buffers) { + subscription.mailbox += nextId(topic.name, cfg) -> buffer + } + } + + case ActorMessages.Fetch(topic, cfg, max, promise) => + ensureSubscription(topic.name, cfg) + val subscription = topics(topic.name)(cfg) + val messages = subscription.mailbox.take(max) + subscription.unacked ++= messages + subscription.mailbox --= messages.map(_._1) + promise.success(messages.toSeq.map { + case (ackId, buffer) => + new Message[Any] { + val id = ackId + val data = topic.deserialize(buffer) + } + }) + + case ActorMessages.Ack(messageIds) => + for (id @ (topic, cfg, _) <- messageIds) { + ensureSubscription(topic, cfg) + val subscription = topics(topic)(cfg) + subscription.unacked -= id + } + + case ActorMessages.Unack => + for ((_, subscriptions) <- topics) { + for ((_, subscription) <- subscriptions) { + subscription.mailbox ++= subscription.unacked + subscription.unacked.clear() + } + } + } + + } + + val actor = system.actorOf(Props(new BusMaster)) + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { + actor ! ActorMessages.Send(topic, messages) + } + + override def fetchMessages[A]( + topic: messaging.Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + val result = Promise[Seq[Message[A]]] + actor ! ActorMessages.Fetch(topic, config, maxMessages, result) + result.future + } + + override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { + actor ! ActorMessages.Ack(ids) + } + +} + +object QueueBus { + def apply(implicit system: ActorSystem) = new QueueBus +} -- cgit v1.2.3