From 7c755c77afbd67ae2ded9d8b004736d4e27e208f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 16:18:26 -0700 Subject: Move storage and messaging to separate projects --- .../xyz/driver/core/messaging/AliyunBus.scala | 157 ++++++++++++ .../main/scala/xyz/driver/core/messaging/Bus.scala | 74 ++++++ .../xyz/driver/core/messaging/CreateOnDemand.scala | 49 ++++ .../xyz/driver/core/messaging/GoogleBus.scala | 267 +++++++++++++++++++++ .../scala/xyz/driver/core/messaging/QueueBus.scala | 126 ++++++++++ .../xyz/driver/core/messaging/StreamBus.scala | 102 ++++++++ .../scala/xyz/driver/core/messaging/Topic.scala | 43 ++++ .../xyz/driver/core/messaging/QueueBusTest.scala | 30 +++ 8 files changed, 848 insertions(+) create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala create mode 100644 core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala (limited to 'core-messaging') diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala new file mode 100644 index 0000000..8b7bca7 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -0,0 +1,157 @@ +package xyz.driver.core.messaging +import java.nio.ByteBuffer +import java.util + +import com.aliyun.mns.client.{AsyncCallback, CloudAccount} +import com.aliyun.mns.common.ServiceException +import com.aliyun.mns.model +import com.aliyun.mns.model._ + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} + +class AliyunBus( + accountId: String, + accessId: String, + accessSecret: String, + region: String, + namespace: String, + pullTimeout: Int +)(implicit val executionContext: ExecutionContext) + extends Bus { + private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" + private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) + private val client = cloudAccount.getMNSClient + + // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the + // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError + // occasionally. We mask both of these errors and return an empty list of messages + private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") + + override val defaultMaxMessages: Int = 10 + + case class MessageId(queueName: String, messageHandle: String) + + case class Message[A](id: MessageId, data: A) extends BasicMessage[A] + + case class SubscriptionConfig( + subscriptionPrefix: String = accessId, + ackTimeout: FiniteDuration = 10.seconds + ) + + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + private def rawTopicName(topic: Topic[_]) = + s"$namespace-${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"$namespace-${config.subscriptionPrefix}-${topic.name}" + + override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + import collection.JavaConverters._ + val subscriptionName = rawSubscriptionName(config, topic) + val queueRef = client.getQueueRef(subscriptionName) + + val promise = Promise[Seq[model.Message]] + queueRef.asyncBatchPopMessage( + maxMessages, + pullTimeout, + new AsyncCallback[util.List[model.Message]] { + override def onSuccess(result: util.List[model.Message]): Unit = { + promise.success(result.asScala) + } + override def onFail(ex: Exception): Unit = ex match { + case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => + promise.success(Nil) + case _ => + promise.failure(ex) + } + } + ) + + promise.future.map(_.map { message => + import scala.xml.XML + val messageId = MessageId(subscriptionName, message.getReceiptHandle) + val messageXML = XML.loadString(message.getMessageBodyAsRawString) + val messageNode = messageXML \ "Message" + val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) + + val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) + Message(messageId, deserializedMessage) + }) + } + + override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { + import collection.JavaConverters._ + require(messages.nonEmpty, "Acknowledged message list must be non-empty") + + val queueRef = client.getQueueRef(messages.head.queueName) + + val promise = Promise[Unit] + queueRef.asyncBatchDeleteMessage( + messages.map(_.messageHandle).asJava, + new AsyncCallback[Void] { + override def onSuccess(result: Void): Unit = promise.success(()) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { + val topicRef = client.getTopicRef(rawTopicName(topic)) + + val publishMessages = messages.map { message => + val promise = Promise[TopicMessage] + + val topicMessage = new Base64TopicMessage + topicMessage.setMessageBody(topic.serialize(message).array()) + + topicRef.asyncPublishMessage( + topicMessage, + new AsyncCallback[TopicMessage] { + override def onSuccess(result: TopicMessage): Unit = promise.success(result) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + Future.sequence(publishMessages).map(_ => ()) + } + + def createTopic(topic: Topic[_]): Future[Unit] = Future { + val topicName = rawTopicName(topic) + val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) + if (!topicExists) { + val topicMeta = new TopicMeta + topicMeta.setTopicName(topicName) + client.createTopic(topicMeta) + } + } + + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { + val subscriptionName = rawSubscriptionName(config, topic) + val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) + + if (!queueExists) { + val topicName = rawTopicName(topic) + val topicRef = client.getTopicRef(topicName) + + val queueMeta = new QueueMeta + queueMeta.setQueueName(subscriptionName) + queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) + client.createQueue(queueMeta) + + val subscriptionMeta = new SubscriptionMeta + subscriptionMeta.setSubscriptionName(subscriptionName) + subscriptionMeta.setTopicName(topicName) + subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) + topicRef.subscribe(subscriptionMeta) + } + } +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala new file mode 100644 index 0000000..75954f4 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala @@ -0,0 +1,74 @@ +package xyz.driver.core +package messaging + +import scala.concurrent._ +import scala.language.higherKinds + +/** Base trait for representing message buses. + * + * Message buses are expected to provide "at least once" delivery semantics and are + * expected to retry delivery when a message remains unacknowledged for a reasonable + * amount of time. */ +trait Bus { + + /** Type of unique message identifiers. Usually a string or UUID. */ + type MessageId + + /** Most general kind of message. Any implementation of a message bus must provide + * the fields and methods specified in this trait. */ + trait BasicMessage[A] { self: Message[A] => + + /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ + def id: MessageId + + /** Actual message content. */ + def data: A + + } + + /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage + * (as that defines the minimal required fields of a messages), but may be refined to + * provide bus-specific additional data. */ + type Message[A] <: BasicMessage[A] + + /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */ + type SubscriptionConfig + + /** Default value for a subscription configuration. It is such that any service will have a unique subscription + * for every topic, shared among all its instances. */ + val defaultSubscriptionConfig: SubscriptionConfig + + /** Maximum amount of messages handled in a single retrieval call. */ + val defaultMaxMessages = 64 + + /** Execution context that is used to query and dispatch messages from this bus. */ + implicit val executionContext: ExecutionContext + + /** Retrieve any new messages in the mailbox of a subscription. + * + * Any retrieved messages become "outstanding" and should not be returned by this function + * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged. + * In that case, they will again be considered new and will be returned by this function. + * + * Note that although outstanding and acknowledged messages will eventually be removed from + * mailboxes, no guarantee can be made that a message will be delivered only once. */ + def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]] + + /** Acknowledge that a given series of messages has been handled and should + * not be delivered again. + * + * Note that messages become eventually acknowledged and hence may be delivered more than once. + * @see fetchMessages() + */ + def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] + + /** Send a series of messages to a topic. + * + * The returned future will complete once messages have been accepted to the underlying bus. + * No guarantee can be made of their delivery to subscribers. */ + def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] + +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala new file mode 100644 index 0000000..1af5308 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala @@ -0,0 +1,49 @@ +package xyz.driver.core +package messaging + +import java.util.concurrent.ConcurrentHashMap + +import scala.async.Async.{async, await} +import scala.concurrent.Future + +/** Utility mixin that will ensure that topics and subscriptions exist before + * attempting to read or write from or to them. + */ +trait CreateOnDemand extends Bus { + + /** Create the given topic. This operation is idempotent and is expected to succeed if the topic + * already exists. + */ + def createTopic(topic: Topic[_]): Future[Unit] + + /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription + * already exists. + */ + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] + + private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]] + private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]] + + private def ensureTopic(topic: Topic[_]) = + createdTopics.computeIfAbsent(topic, t => createTopic(t)) + + private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) = + createdSubscriptions.computeIfAbsent(topic -> config, { + case (t, c) => createSubscription(t, c) + }) + + abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { + await(ensureTopic(topic)) + await(super.publishMessages(topic, messages)) + } + + abstract override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = async { + await(ensureTopic(topic)) + await(ensureSubscription(topic, config)) + await(super.fetchMessages(topic, config, maxMessages)) + } + +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala new file mode 100644 index 0000000..b296c50 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala @@ -0,0 +1,267 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer +import java.nio.file.{Files, Path, Paths} +import java.security.Signature +import java.time.Instant +import java.util + +import com.google.auth.oauth2.ServiceAccountCredentials +import com.softwaremill.sttp._ +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import scala.async.Async.{async, await} +import scala.concurrent._ +import scala.concurrent.duration._ + +/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]] + * + * == Overview == + * + * The Pub/Sub message system is focused around a few concepts: 'topics', + * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may + * have multiple ''subscriptions'' associated to them. Every subscription to a + * topic will receive all messages sent to the topic. Messages are enqueued in + * a subscription until they are acknowledged by a ''subscriber''. Multiple + * subscribers may be associated to a subscription, in which case messages will + * get delivered arbitrarily among them. + * + * Topics and subscriptions are named resources which can be specified in + * Pub/Sub's configuration and may be queried. Subscribers on the other hand, + * are ephemeral processes that query a subscription on a regular basis, handle any + * messages and acknowledge them. + * + * == Delivery semantics == + * + * - at least once + * - no ordering + * + * == Retention == + * + * - configurable retry delay for unacknowledged messages, defaults to 10s + * - undeliverable messages are kept for 7 days + * + * @param credentials Google cloud credentials, usually the same as used by a + * service. Must have admin access to topics and + * descriptions. + * @param namespace The namespace in which this bus is running. Will be used to + * determine the exact name of topics and subscriptions. + * @param pullTimeout Delay after which a call to fetchMessages() will return an + * empty list, assuming that no messages have been received. + * @param executionContext Execution context to run any blocking commands. + * @param backend sttp backend used to query Pub/Sub's HTTP API + */ +class GoogleBus( + credentials: ServiceAccountCredentials, + namespace: String, + pullTimeout: Duration = 90.seconds +)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) + extends Bus { + import GoogleBus.Protocol + + case class MessageId(subscription: String, ackId: String) + + case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A] + type Message[A] = PubsubMessage[A] + + /** Subscription-specific configuration + * + * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions. + * All messages sent to a subscription will be dispatched arbitrarily + * among any subscribers. Defaults to the email of the credentials used by this + * bus instance, thereby giving every service a unique subscription to every topic. + * To give every service instance a unique subscription, this must be changed to a + * unique value. + * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again. + */ + case class SubscriptionConfig( + subscriptionPrefix: String = credentials.getClientEmail.split("@")(0), + ackTimeout: FiniteDuration = 10.seconds + ) + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + /** Obtain an authentication token valid for the given duration + * https://developers.google.com/identity/protocols/OAuth2ServiceAccount + */ + private def freshAuthToken(duration: FiniteDuration): Future[String] = { + def jwt = { + val now = Instant.now().getEpochSecond + val base64 = util.Base64.getEncoder + val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) + val body = base64.encodeToString( + s"""|{ + | "iss": "${credentials.getClientEmail}", + | "scope": "https://www.googleapis.com/auth/pubsub", + | "aud": "https://www.googleapis.com/oauth2/v4/token", + | "exp": ${now + duration.toSeconds}, + | "iat": $now + |}""".stripMargin.getBytes("utf-8") + ) + val signer = Signature.getInstance("SHA256withRSA") + signer.initSign(credentials.getPrivateKey) + signer.update(s"$header.$body".getBytes("utf-8")) + val signature = base64.encodeToString(signer.sign()) + s"$header.$body.$signature" + } + sttp + .post(uri"https://www.googleapis.com/oauth2/v4/token") + .body( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> jwt + ) + .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) + .send() + .map(_.unsafeBody) + } + + // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time + private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes)) + + private val baseUri = uri"https://pubsub.googleapis.com/" + + private def rawTopicName(topic: Topic[_]) = + s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}" + + def createTopic(topic: Topic[_]): Future[Unit] = async { + val request = sttp + .put(baseUri.path(s"v1/${rawTopicName(topic)}")) + .auth + .bearer(await(getToken())) + val result = await(request.send()) + result.body match { + case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it + throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error") + case _ => () + } + } + + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async { + val request = sttp + .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}")) + .auth + .bearer(await(getToken())) + .body( + JsObject( + "topic" -> rawTopicName(topic).toJson, + "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson + ).compactPrint + ) + val result = await(request.send()) + result.body match { + case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it + throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error") + case _ => () + } + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { + import Protocol.bufferFormat + val buffers: Seq[ByteBuffer] = messages.map(topic.serialize) + val request = sttp + .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish")) + .auth + .bearer(await(getToken())) + .body( + JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint + ) + await(request.send()).unsafeBody + () + } + + override def fetchMessages[A]( + topic: Topic[A], + subscriptionConfig: SubscriptionConfig, + maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async { + val subscription = rawSubscriptionName(subscriptionConfig, topic) + val request = sttp + .post(baseUri.path(s"v1/$subscription:pull")) + .auth + .bearer(await(getToken().map(x => x))) + .body( + JsObject( + "returnImmediately" -> JsFalse, + "maxMessages" -> JsNumber(maxMessages) + ).compactPrint + ) + .readTimeout(pullTimeout) + .mapResponse(_.parseJson) + + val messages = await(request.send()).unsafeBody match { + case JsObject(fields) if fields.isEmpty => Seq() + case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages + } + + messages.map { msg => + PubsubMessage[A]( + MessageId(subscription, msg.ackId), + topic.deserialize(msg.message.data), + msg.message.publishTime + ) + } + } + + override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async { + val request = sttp + .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge")) + .auth + .bearer(await(getToken())) + .body( + JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint + ) + await(request.send()).unsafeBody + () + } + +} + +object GoogleBus { + + private object Protocol extends DefaultJsonProtocol { + case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage]) + case class ReceivedMessage(ackId: String, message: PubsubMessage) + case class PubsubMessage(data: ByteBuffer, publishTime: Instant) + + implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] { + override def write(obj: Instant): JsValue = JsString(obj.toString) + override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) + } + implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] { + override def write(obj: ByteBuffer): JsValue = + JsString(util.Base64.getEncoder.encodeToString(obj.array())) + + override def read(json: JsValue): ByteBuffer = { + val encodedBytes = json.convertTo[String].getBytes("utf-8") + val decodedBytes = util.Base64.getDecoder.decode(encodedBytes) + ByteBuffer.wrap(decodedBytes) + } + } + + implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage) + implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage) + implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull) + } + + def fromKeyfile(keyfile: Path, namespace: String)( + implicit executionContext: ExecutionContext, + backend: SttpBackend[Future, _]): GoogleBus = { + val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile)) + new GoogleBus(creds, namespace) + } + + @deprecated( + "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.", + "driver-core 1.12.2") + def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = { + def env(key: String) = { + require(sys.env.contains(key), s"Environment variable $key is not set.") + sys.env(key) + } + val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS")) + fromKeyfile(keyfile, env("SERVICE_NAMESPACE")) + } + +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala new file mode 100644 index 0000000..45c9ed5 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala @@ -0,0 +1,126 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +import akka.actor.{Actor, ActorSystem, Props} + +import scala.collection.mutable +import scala.collection.mutable.Map +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} + +/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system + * and is intended for testing purposes only. */ +class QueueBus(implicit system: ActorSystem) extends Bus { + import system.dispatcher + + override type SubscriptionConfig = Long + override val defaultSubscriptionConfig: Long = 0 + override val executionContext = system.dispatcher + + override type MessageId = (String, SubscriptionConfig, Long) + type Message[A] = BasicMessage[A] + + private object ActorMessages { + case class Send[A](topic: Topic[A], messages: Seq[A]) + case class Ack(messages: Seq[MessageId]) + case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) + case object Unack + } + + class Subscription { + val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + } + + private class BusMaster extends Actor { + var _id = 0L + def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { + _id += 1; (topic, cfg, _id) + } + + val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty + + def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { + topics.get(topic) match { + case Some(t) => + t.getOrElseUpdate(cfg, new Subscription) + case None => + topics += topic -> mutable.Map.empty + ensureSubscription(topic, cfg) + } + } + + override def preStart(): Unit = { + context.system.scheduler.schedule(1.seconds, 1.seconds) { + self ! ActorMessages.Unack + } + } + + override def receive: Receive = { + case ActorMessages.Send(topic, messages) => + val buffers = messages.map(topic.serialize) + val subscriptions = topics.getOrElse(topic.name, Map.empty) + for ((cfg, subscription) <- subscriptions) { + for (buffer <- buffers) { + subscription.mailbox += nextId(topic.name, cfg) -> buffer + } + } + + case ActorMessages.Fetch(topic, cfg, max, promise) => + ensureSubscription(topic.name, cfg) + val subscription = topics(topic.name)(cfg) + val messages = subscription.mailbox.take(max) + subscription.unacked ++= messages + subscription.mailbox --= messages.map(_._1) + promise.success(messages.toSeq.map { + case (ackId, buffer) => + new Message[Any] { + val id = ackId + val data = topic.deserialize(buffer) + } + }) + + case ActorMessages.Ack(messageIds) => + for (id @ (topic, cfg, _) <- messageIds) { + ensureSubscription(topic, cfg) + val subscription = topics(topic)(cfg) + subscription.unacked -= id + } + + case ActorMessages.Unack => + for ((_, subscriptions) <- topics) { + for ((_, subscription) <- subscriptions) { + subscription.mailbox ++= subscription.unacked + subscription.unacked.clear() + } + } + } + + } + + val actor = system.actorOf(Props(new BusMaster)) + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { + actor ! ActorMessages.Send(topic, messages) + } + + override def fetchMessages[A]( + topic: messaging.Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + val result = Promise[Seq[Message[A]]] + actor ! ActorMessages.Fetch(topic, config, maxMessages, result) + result.future + } + + override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { + actor ! ActorMessages.Ack(ids) + } + +} + +object QueueBus { + def apply(implicit system: ActorSystem) = new QueueBus +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala new file mode 100644 index 0000000..44d75cd --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -0,0 +1,102 @@ +package xyz.driver.core +package messaging + +import akka.NotUsed +import akka.stream.Materializer +import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** An extension to message buses that offers an Akka-Streams API. + * + * Example usage of a stream that subscribes to one topic, prints any messages + * it receives and finally acknowledges them + * their receipt. + * {{{ + * val bus: StreamBus = ??? + * val topic = Topic.string("topic") + * bus.subscribe(topic1) + * .map{ msg => + * print(msg.data) + * msg + * } + * .to(bus.acknowledge) + * .run() + * }}} */ +trait StreamBus extends Bus { + + /** Flow that publishes any messages to a given topic. + * Emits messages once they have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { + Flow[A] + .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) + .mapAsync(1) { a => + publishMessages(topic, a).map(_ => a) + } + .mapConcat(list => list.toList) + } + + /** Sink that acknowledges the receipt of a message. */ + def acknowledge: Sink[MessageId, NotUsed] = { + Flow[MessageId] + .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) + .mapAsync(1)(acknowledgeMessages(_)) + .to(Sink.ignore) + } + + /** Source that listens to a subscription and receives any messages sent to its topic. */ + def subscribe[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { + Source + .unfoldAsync((topic, config))( + topicAndConfig => + fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => + Some(topicAndConfig -> msgs)) + ) + .filter(_.nonEmpty) + .mapConcat(messages => messages.toList) + } + + def runWithRestart[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { + RestartSource + .withBackoff[MessageId]( + minBackoff, + maxBackoff, + randomFactor, + maxRestarts + ) { () => + subscribe(topic, config) + .via(processMessage) + .log(topic.name) + .mapConcat(identity) + } + .to(acknowledge) + .run() + } + + def handleMessage[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + parallelism: Int = 1, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { + runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { + Flow[Message[A]].mapAsync(parallelism) { message => + processMessage(message.data).map(_ => message.id :: Nil) + } + } + } +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala new file mode 100644 index 0000000..32fd764 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala @@ -0,0 +1,43 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +/** A topic is a named group of messages that all share a common schema. + * @tparam Message type of messages sent over this topic */ +trait Topic[Message] { + + /** Name of this topic (must be unique). */ + def name: String + + /** Convert a message to its wire format that will be sent over a bus. */ + def serialize(message: Message): ByteBuffer + + /** Convert a message from its wire format. */ + def deserialize(message: ByteBuffer): Message + +} + +object Topic { + + /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */ + def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] { + def name = name0 + override def serialize(message: ByteBuffer): ByteBuffer = message + override def deserialize(message: ByteBuffer): ByteBuffer = message + } + + /** Create a topic that represents data as UTF-8 encoded strings. */ + def string(name0: String): Topic[String] = new Topic[String] { + def name = name0 + override def serialize(message: String): ByteBuffer = { + ByteBuffer.wrap(message.getBytes("utf-8")) + } + override def deserialize(message: ByteBuffer): String = { + val bytes = new Array[Byte](message.remaining()) + message.get(bytes) + new String(bytes, "utf-8") + } + } + +} diff --git a/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala b/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala new file mode 100644 index 0000000..8dd0776 --- /dev/null +++ b/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala @@ -0,0 +1,30 @@ +package xyz.driver.core.messaging + +import akka.actor.ActorSystem +import org.scalatest.FlatSpec +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class QueueBusTest extends FlatSpec with ScalaFutures { + implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds) + + def busBehaviour(bus: Bus)(implicit ec: ExecutionContext): Unit = { + + it should "deliver messages to a subscriber" in { + val topic = Topic.string("test.topic1") + bus.fetchMessages(topic).futureValue + bus.publishMessages(topic, Seq("hello world!")) + Thread.sleep(100) + val messages = bus.fetchMessages(topic) + assert(messages.futureValue.map(_.data).toList == List("hello world!")) + } + } + + implicit val system: ActorSystem = ActorSystem("queue-test") + import system.dispatcher + + "A queue-based bus" should behave like busBehaviour(new QueueBus) + +} -- cgit v1.2.3