aboutsummaryrefslogtreecommitdiff
path: root/core-messaging
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-12 16:18:26 -0700
committerJakob Odersky <jakob@odersky.com>2018-10-09 16:19:39 -0700
commit7c755c77afbd67ae2ded9d8b004736d4e27e208f (patch)
treee93f4590165a338ed284adeb6f4a6bd43bb16b6a /core-messaging
parent76db2360364a35be31414a12cbc419a534a51744 (diff)
downloaddriver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.gz
driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.bz2
driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.zip
Move storage and messaging to separate projects
Diffstat (limited to 'core-messaging')
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala157
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala74
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala49
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala267
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala126
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala102
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala43
-rw-r--r--core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala30
8 files changed, 848 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
new file mode 100644
index 0000000..8b7bca7
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -0,0 +1,157 @@
+package xyz.driver.core.messaging
+import java.nio.ByteBuffer
+import java.util
+
+import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
+import com.aliyun.mns.common.ServiceException
+import com.aliyun.mns.model
+import com.aliyun.mns.model._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+class AliyunBus(
+ accountId: String,
+ accessId: String,
+ accessSecret: String,
+ region: String,
+ namespace: String,
+ pullTimeout: Int
+)(implicit val executionContext: ExecutionContext)
+ extends Bus {
+ private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com"
+ private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
+ private val client = cloudAccount.getMNSClient
+
+ // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the
+ // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError
+ // occasionally. We mask both of these errors and return an empty list of messages
+ private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError")
+
+ override val defaultMaxMessages: Int = 10
+
+ case class MessageId(queueName: String, messageHandle: String)
+
+ case class Message[A](id: MessageId, data: A) extends BasicMessage[A]
+
+ case class SubscriptionConfig(
+ subscriptionPrefix: String = accessId,
+ ackTimeout: FiniteDuration = 10.seconds
+ )
+
+ override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()
+
+ private def rawTopicName(topic: Topic[_]) =
+ s"$namespace-${topic.name}"
+ private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
+ s"$namespace-${config.subscriptionPrefix}-${topic.name}"
+
+ override def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = {
+ import collection.JavaConverters._
+ val subscriptionName = rawSubscriptionName(config, topic)
+ val queueRef = client.getQueueRef(subscriptionName)
+
+ val promise = Promise[Seq[model.Message]]
+ queueRef.asyncBatchPopMessage(
+ maxMessages,
+ pullTimeout,
+ new AsyncCallback[util.List[model.Message]] {
+ override def onSuccess(result: util.List[model.Message]): Unit = {
+ promise.success(result.asScala)
+ }
+ override def onFail(ex: Exception): Unit = ex match {
+ case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) =>
+ promise.success(Nil)
+ case _ =>
+ promise.failure(ex)
+ }
+ }
+ )
+
+ promise.future.map(_.map { message =>
+ import scala.xml.XML
+ val messageId = MessageId(subscriptionName, message.getReceiptHandle)
+ val messageXML = XML.loadString(message.getMessageBodyAsRawString)
+ val messageNode = messageXML \ "Message"
+ val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)
+
+ val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
+ Message(messageId, deserializedMessage)
+ })
+ }
+
+ override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
+ import collection.JavaConverters._
+ require(messages.nonEmpty, "Acknowledged message list must be non-empty")
+
+ val queueRef = client.getQueueRef(messages.head.queueName)
+
+ val promise = Promise[Unit]
+ queueRef.asyncBatchDeleteMessage(
+ messages.map(_.messageHandle).asJava,
+ new AsyncCallback[Void] {
+ override def onSuccess(result: Void): Unit = promise.success(())
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future
+ }
+
+ override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
+ val topicRef = client.getTopicRef(rawTopicName(topic))
+
+ val publishMessages = messages.map { message =>
+ val promise = Promise[TopicMessage]
+
+ val topicMessage = new Base64TopicMessage
+ topicMessage.setMessageBody(topic.serialize(message).array())
+
+ topicRef.asyncPublishMessage(
+ topicMessage,
+ new AsyncCallback[TopicMessage] {
+ override def onSuccess(result: TopicMessage): Unit = promise.success(result)
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future
+ }
+
+ Future.sequence(publishMessages).map(_ => ())
+ }
+
+ def createTopic(topic: Topic[_]): Future[Unit] = Future {
+ val topicName = rawTopicName(topic)
+ val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
+ if (!topicExists) {
+ val topicMeta = new TopicMeta
+ topicMeta.setTopicName(topicName)
+ client.createTopic(topicMeta)
+ }
+ }
+
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
+ val subscriptionName = rawSubscriptionName(config, topic)
+ val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)
+
+ if (!queueExists) {
+ val topicName = rawTopicName(topic)
+ val topicRef = client.getTopicRef(topicName)
+
+ val queueMeta = new QueueMeta
+ queueMeta.setQueueName(subscriptionName)
+ queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
+ client.createQueue(queueMeta)
+
+ val subscriptionMeta = new SubscriptionMeta
+ subscriptionMeta.setSubscriptionName(subscriptionName)
+ subscriptionMeta.setTopicName(topicName)
+ subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
+ topicRef.subscribe(subscriptionMeta)
+ }
+ }
+}
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala
new file mode 100644
index 0000000..75954f4
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala
@@ -0,0 +1,74 @@
+package xyz.driver.core
+package messaging
+
+import scala.concurrent._
+import scala.language.higherKinds
+
+/** Base trait for representing message buses.
+ *
+ * Message buses are expected to provide "at least once" delivery semantics and are
+ * expected to retry delivery when a message remains unacknowledged for a reasonable
+ * amount of time. */
+trait Bus {
+
+ /** Type of unique message identifiers. Usually a string or UUID. */
+ type MessageId
+
+ /** Most general kind of message. Any implementation of a message bus must provide
+ * the fields and methods specified in this trait. */
+ trait BasicMessage[A] { self: Message[A] =>
+
+ /** All messages must have unique IDs so that they can be acknowledged unambiguously. */
+ def id: MessageId
+
+ /** Actual message content. */
+ def data: A
+
+ }
+
+ /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage
+ * (as that defines the minimal required fields of a messages), but may be refined to
+ * provide bus-specific additional data. */
+ type Message[A] <: BasicMessage[A]
+
+ /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */
+ type SubscriptionConfig
+
+ /** Default value for a subscription configuration. It is such that any service will have a unique subscription
+ * for every topic, shared among all its instances. */
+ val defaultSubscriptionConfig: SubscriptionConfig
+
+ /** Maximum amount of messages handled in a single retrieval call. */
+ val defaultMaxMessages = 64
+
+ /** Execution context that is used to query and dispatch messages from this bus. */
+ implicit val executionContext: ExecutionContext
+
+ /** Retrieve any new messages in the mailbox of a subscription.
+ *
+ * Any retrieved messages become "outstanding" and should not be returned by this function
+ * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged.
+ * In that case, they will again be considered new and will be returned by this function.
+ *
+ * Note that although outstanding and acknowledged messages will eventually be removed from
+ * mailboxes, no guarantee can be made that a message will be delivered only once. */
+ def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]]
+
+ /** Acknowledge that a given series of messages has been handled and should
+ * not be delivered again.
+ *
+ * Note that messages become eventually acknowledged and hence may be delivered more than once.
+ * @see fetchMessages()
+ */
+ def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit]
+
+ /** Send a series of messages to a topic.
+ *
+ * The returned future will complete once messages have been accepted to the underlying bus.
+ * No guarantee can be made of their delivery to subscribers. */
+ def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit]
+
+}
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
new file mode 100644
index 0000000..1af5308
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
@@ -0,0 +1,49 @@
+package xyz.driver.core
+package messaging
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.async.Async.{async, await}
+import scala.concurrent.Future
+
+/** Utility mixin that will ensure that topics and subscriptions exist before
+ * attempting to read or write from or to them.
+ */
+trait CreateOnDemand extends Bus {
+
+ /** Create the given topic. This operation is idempotent and is expected to succeed if the topic
+ * already exists.
+ */
+ def createTopic(topic: Topic[_]): Future[Unit]
+
+ /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription
+ * already exists.
+ */
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
+
+ private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]]
+ private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]]
+
+ private def ensureTopic(topic: Topic[_]) =
+ createdTopics.computeIfAbsent(topic, t => createTopic(t))
+
+ private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) =
+ createdSubscriptions.computeIfAbsent(topic -> config, {
+ case (t, c) => createSubscription(t, c)
+ })
+
+ abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
+ await(ensureTopic(topic))
+ await(super.publishMessages(topic, messages))
+ }
+
+ abstract override def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = async {
+ await(ensureTopic(topic))
+ await(ensureSubscription(topic, config))
+ await(super.fetchMessages(topic, config, maxMessages))
+ }
+
+}
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
new file mode 100644
index 0000000..b296c50
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
@@ -0,0 +1,267 @@
+package xyz.driver.core
+package messaging
+
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Path, Paths}
+import java.security.Signature
+import java.time.Instant
+import java.util
+
+import com.google.auth.oauth2.ServiceAccountCredentials
+import com.softwaremill.sttp._
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+
+import scala.async.Async.{async, await}
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]]
+ *
+ * == Overview ==
+ *
+ * The Pub/Sub message system is focused around a few concepts: 'topics',
+ * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may
+ * have multiple ''subscriptions'' associated to them. Every subscription to a
+ * topic will receive all messages sent to the topic. Messages are enqueued in
+ * a subscription until they are acknowledged by a ''subscriber''. Multiple
+ * subscribers may be associated to a subscription, in which case messages will
+ * get delivered arbitrarily among them.
+ *
+ * Topics and subscriptions are named resources which can be specified in
+ * Pub/Sub's configuration and may be queried. Subscribers on the other hand,
+ * are ephemeral processes that query a subscription on a regular basis, handle any
+ * messages and acknowledge them.
+ *
+ * == Delivery semantics ==
+ *
+ * - at least once
+ * - no ordering
+ *
+ * == Retention ==
+ *
+ * - configurable retry delay for unacknowledged messages, defaults to 10s
+ * - undeliverable messages are kept for 7 days
+ *
+ * @param credentials Google cloud credentials, usually the same as used by a
+ * service. Must have admin access to topics and
+ * descriptions.
+ * @param namespace The namespace in which this bus is running. Will be used to
+ * determine the exact name of topics and subscriptions.
+ * @param pullTimeout Delay after which a call to fetchMessages() will return an
+ * empty list, assuming that no messages have been received.
+ * @param executionContext Execution context to run any blocking commands.
+ * @param backend sttp backend used to query Pub/Sub's HTTP API
+ */
+class GoogleBus(
+ credentials: ServiceAccountCredentials,
+ namespace: String,
+ pullTimeout: Duration = 90.seconds
+)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _])
+ extends Bus {
+ import GoogleBus.Protocol
+
+ case class MessageId(subscription: String, ackId: String)
+
+ case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A]
+ type Message[A] = PubsubMessage[A]
+
+ /** Subscription-specific configuration
+ *
+ * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions.
+ * All messages sent to a subscription will be dispatched arbitrarily
+ * among any subscribers. Defaults to the email of the credentials used by this
+ * bus instance, thereby giving every service a unique subscription to every topic.
+ * To give every service instance a unique subscription, this must be changed to a
+ * unique value.
+ * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again.
+ */
+ case class SubscriptionConfig(
+ subscriptionPrefix: String = credentials.getClientEmail.split("@")(0),
+ ackTimeout: FiniteDuration = 10.seconds
+ )
+ override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()
+
+ /** Obtain an authentication token valid for the given duration
+ * https://developers.google.com/identity/protocols/OAuth2ServiceAccount
+ */
+ private def freshAuthToken(duration: FiniteDuration): Future[String] = {
+ def jwt = {
+ val now = Instant.now().getEpochSecond
+ val base64 = util.Base64.getEncoder
+ val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8"))
+ val body = base64.encodeToString(
+ s"""|{
+ | "iss": "${credentials.getClientEmail}",
+ | "scope": "https://www.googleapis.com/auth/pubsub",
+ | "aud": "https://www.googleapis.com/oauth2/v4/token",
+ | "exp": ${now + duration.toSeconds},
+ | "iat": $now
+ |}""".stripMargin.getBytes("utf-8")
+ )
+ val signer = Signature.getInstance("SHA256withRSA")
+ signer.initSign(credentials.getPrivateKey)
+ signer.update(s"$header.$body".getBytes("utf-8"))
+ val signature = base64.encodeToString(signer.sign())
+ s"$header.$body.$signature"
+ }
+ sttp
+ .post(uri"https://www.googleapis.com/oauth2/v4/token")
+ .body(
+ "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer",
+ "assertion" -> jwt
+ )
+ .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String])
+ .send()
+ .map(_.unsafeBody)
+ }
+
+ // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time
+ private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes))
+
+ private val baseUri = uri"https://pubsub.googleapis.com/"
+
+ private def rawTopicName(topic: Topic[_]) =
+ s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}"
+ private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
+ s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}"
+
+ def createTopic(topic: Topic[_]): Future[Unit] = async {
+ val request = sttp
+ .put(baseUri.path(s"v1/${rawTopicName(topic)}"))
+ .auth
+ .bearer(await(getToken()))
+ val result = await(request.send())
+ result.body match {
+ case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it
+ throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error")
+ case _ => ()
+ }
+ }
+
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async {
+ val request = sttp
+ .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}"))
+ .auth
+ .bearer(await(getToken()))
+ .body(
+ JsObject(
+ "topic" -> rawTopicName(topic).toJson,
+ "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson
+ ).compactPrint
+ )
+ val result = await(request.send())
+ result.body match {
+ case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it
+ throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error")
+ case _ => ()
+ }
+ }
+
+ override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
+ import Protocol.bufferFormat
+ val buffers: Seq[ByteBuffer] = messages.map(topic.serialize)
+ val request = sttp
+ .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish"))
+ .auth
+ .bearer(await(getToken()))
+ .body(
+ JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint
+ )
+ await(request.send()).unsafeBody
+ ()
+ }
+
+ override def fetchMessages[A](
+ topic: Topic[A],
+ subscriptionConfig: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async {
+ val subscription = rawSubscriptionName(subscriptionConfig, topic)
+ val request = sttp
+ .post(baseUri.path(s"v1/$subscription:pull"))
+ .auth
+ .bearer(await(getToken().map(x => x)))
+ .body(
+ JsObject(
+ "returnImmediately" -> JsFalse,
+ "maxMessages" -> JsNumber(maxMessages)
+ ).compactPrint
+ )
+ .readTimeout(pullTimeout)
+ .mapResponse(_.parseJson)
+
+ val messages = await(request.send()).unsafeBody match {
+ case JsObject(fields) if fields.isEmpty => Seq()
+ case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages
+ }
+
+ messages.map { msg =>
+ PubsubMessage[A](
+ MessageId(subscription, msg.ackId),
+ topic.deserialize(msg.message.data),
+ msg.message.publishTime
+ )
+ }
+ }
+
+ override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async {
+ val request = sttp
+ .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge"))
+ .auth
+ .bearer(await(getToken()))
+ .body(
+ JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint
+ )
+ await(request.send()).unsafeBody
+ ()
+ }
+
+}
+
+object GoogleBus {
+
+ private object Protocol extends DefaultJsonProtocol {
+ case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage])
+ case class ReceivedMessage(ackId: String, message: PubsubMessage)
+ case class PubsubMessage(data: ByteBuffer, publishTime: Instant)
+
+ implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] {
+ override def write(obj: Instant): JsValue = JsString(obj.toString)
+ override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String])
+ }
+ implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] {
+ override def write(obj: ByteBuffer): JsValue =
+ JsString(util.Base64.getEncoder.encodeToString(obj.array()))
+
+ override def read(json: JsValue): ByteBuffer = {
+ val encodedBytes = json.convertTo[String].getBytes("utf-8")
+ val decodedBytes = util.Base64.getDecoder.decode(encodedBytes)
+ ByteBuffer.wrap(decodedBytes)
+ }
+ }
+
+ implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage)
+ implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage)
+ implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull)
+ }
+
+ def fromKeyfile(keyfile: Path, namespace: String)(
+ implicit executionContext: ExecutionContext,
+ backend: SttpBackend[Future, _]): GoogleBus = {
+ val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile))
+ new GoogleBus(creds, namespace)
+ }
+
+ @deprecated(
+ "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.",
+ "driver-core 1.12.2")
+ def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = {
+ def env(key: String) = {
+ require(sys.env.contains(key), s"Environment variable $key is not set.")
+ sys.env(key)
+ }
+ val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS"))
+ fromKeyfile(keyfile, env("SERVICE_NAMESPACE"))
+ }
+
+}
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
new file mode 100644
index 0000000..45c9ed5
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
@@ -0,0 +1,126 @@
+package xyz.driver.core
+package messaging
+
+import java.nio.ByteBuffer
+
+import akka.actor.{Actor, ActorSystem, Props}
+
+import scala.collection.mutable
+import scala.collection.mutable.Map
+import scala.concurrent.duration._
+import scala.concurrent.{Future, Promise}
+
+/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system
+ * and is intended for testing purposes only. */
+class QueueBus(implicit system: ActorSystem) extends Bus {
+ import system.dispatcher
+
+ override type SubscriptionConfig = Long
+ override val defaultSubscriptionConfig: Long = 0
+ override val executionContext = system.dispatcher
+
+ override type MessageId = (String, SubscriptionConfig, Long)
+ type Message[A] = BasicMessage[A]
+
+ private object ActorMessages {
+ case class Send[A](topic: Topic[A], messages: Seq[A])
+ case class Ack(messages: Seq[MessageId])
+ case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]])
+ case object Unack
+ }
+
+ class Subscription {
+ val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
+ val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
+ }
+
+ private class BusMaster extends Actor {
+ var _id = 0L
+ def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = {
+ _id += 1; (topic, cfg, _id)
+ }
+
+ val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty
+
+ def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = {
+ topics.get(topic) match {
+ case Some(t) =>
+ t.getOrElseUpdate(cfg, new Subscription)
+ case None =>
+ topics += topic -> mutable.Map.empty
+ ensureSubscription(topic, cfg)
+ }
+ }
+
+ override def preStart(): Unit = {
+ context.system.scheduler.schedule(1.seconds, 1.seconds) {
+ self ! ActorMessages.Unack
+ }
+ }
+
+ override def receive: Receive = {
+ case ActorMessages.Send(topic, messages) =>
+ val buffers = messages.map(topic.serialize)
+ val subscriptions = topics.getOrElse(topic.name, Map.empty)
+ for ((cfg, subscription) <- subscriptions) {
+ for (buffer <- buffers) {
+ subscription.mailbox += nextId(topic.name, cfg) -> buffer
+ }
+ }
+
+ case ActorMessages.Fetch(topic, cfg, max, promise) =>
+ ensureSubscription(topic.name, cfg)
+ val subscription = topics(topic.name)(cfg)
+ val messages = subscription.mailbox.take(max)
+ subscription.unacked ++= messages
+ subscription.mailbox --= messages.map(_._1)
+ promise.success(messages.toSeq.map {
+ case (ackId, buffer) =>
+ new Message[Any] {
+ val id = ackId
+ val data = topic.deserialize(buffer)
+ }
+ })
+
+ case ActorMessages.Ack(messageIds) =>
+ for (id @ (topic, cfg, _) <- messageIds) {
+ ensureSubscription(topic, cfg)
+ val subscription = topics(topic)(cfg)
+ subscription.unacked -= id
+ }
+
+ case ActorMessages.Unack =>
+ for ((_, subscriptions) <- topics) {
+ for ((_, subscription) <- subscriptions) {
+ subscription.mailbox ++= subscription.unacked
+ subscription.unacked.clear()
+ }
+ }
+ }
+
+ }
+
+ val actor = system.actorOf(Props(new BusMaster))
+
+ override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future {
+ actor ! ActorMessages.Send(topic, messages)
+ }
+
+ override def fetchMessages[A](
+ topic: messaging.Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = {
+ val result = Promise[Seq[Message[A]]]
+ actor ! ActorMessages.Fetch(topic, config, maxMessages, result)
+ result.future
+ }
+
+ override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future {
+ actor ! ActorMessages.Ack(ids)
+ }
+
+}
+
+object QueueBus {
+ def apply(implicit system: ActorSystem) = new QueueBus
+}
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
new file mode 100644
index 0000000..44d75cd
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -0,0 +1,102 @@
+package xyz.driver.core
+package messaging
+
+import akka.NotUsed
+import akka.stream.Materializer
+import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+/** An extension to message buses that offers an Akka-Streams API.
+ *
+ * Example usage of a stream that subscribes to one topic, prints any messages
+ * it receives and finally acknowledges them
+ * their receipt.
+ * {{{
+ * val bus: StreamBus = ???
+ * val topic = Topic.string("topic")
+ * bus.subscribe(topic1)
+ * .map{ msg =>
+ * print(msg.data)
+ * msg
+ * }
+ * .to(bus.acknowledge)
+ * .run()
+ * }}} */
+trait StreamBus extends Bus {
+
+ /** Flow that publishes any messages to a given topic.
+ * Emits messages once they have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
+ Flow[A]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
+ .mapAsync(1) { a =>
+ publishMessages(topic, a).map(_ => a)
+ }
+ .mapConcat(list => list.toList)
+ }
+
+ /** Sink that acknowledges the receipt of a message. */
+ def acknowledge: Sink[MessageId, NotUsed] = {
+ Flow[MessageId]
+ .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
+ .mapAsync(1)(acknowledgeMessages(_))
+ .to(Sink.ignore)
+ }
+
+ /** Source that listens to a subscription and receives any messages sent to its topic. */
+ def subscribe[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
+ Source
+ .unfoldAsync((topic, config))(
+ topicAndConfig =>
+ fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
+ Some(topicAndConfig -> msgs))
+ )
+ .filter(_.nonEmpty)
+ .mapConcat(messages => messages.toList)
+ }
+
+ def runWithRestart[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
+ RestartSource
+ .withBackoff[MessageId](
+ minBackoff,
+ maxBackoff,
+ randomFactor,
+ maxRestarts
+ ) { () =>
+ subscribe(topic, config)
+ .via(processMessage)
+ .log(topic.name)
+ .mapConcat(identity)
+ }
+ .to(acknowledge)
+ .run()
+ }
+
+ def handleMessage[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig,
+ parallelism: Int = 1,
+ minBackoff: FiniteDuration = 3.seconds,
+ maxBackoff: FiniteDuration = 30.seconds,
+ randomFactor: Double = 0.2,
+ maxRestarts: Int = 20
+ )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
+ runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
+ Flow[Message[A]].mapAsync(parallelism) { message =>
+ processMessage(message.data).map(_ => message.id :: Nil)
+ }
+ }
+ }
+}
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala
new file mode 100644
index 0000000..32fd764
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala
@@ -0,0 +1,43 @@
+package xyz.driver.core
+package messaging
+
+import java.nio.ByteBuffer
+
+/** A topic is a named group of messages that all share a common schema.
+ * @tparam Message type of messages sent over this topic */
+trait Topic[Message] {
+
+ /** Name of this topic (must be unique). */
+ def name: String
+
+ /** Convert a message to its wire format that will be sent over a bus. */
+ def serialize(message: Message): ByteBuffer
+
+ /** Convert a message from its wire format. */
+ def deserialize(message: ByteBuffer): Message
+
+}
+
+object Topic {
+
+ /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */
+ def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] {
+ def name = name0
+ override def serialize(message: ByteBuffer): ByteBuffer = message
+ override def deserialize(message: ByteBuffer): ByteBuffer = message
+ }
+
+ /** Create a topic that represents data as UTF-8 encoded strings. */
+ def string(name0: String): Topic[String] = new Topic[String] {
+ def name = name0
+ override def serialize(message: String): ByteBuffer = {
+ ByteBuffer.wrap(message.getBytes("utf-8"))
+ }
+ override def deserialize(message: ByteBuffer): String = {
+ val bytes = new Array[Byte](message.remaining())
+ message.get(bytes)
+ new String(bytes, "utf-8")
+ }
+ }
+
+}
diff --git a/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala b/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala
new file mode 100644
index 0000000..8dd0776
--- /dev/null
+++ b/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala
@@ -0,0 +1,30 @@
+package xyz.driver.core.messaging
+
+import akka.actor.ActorSystem
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+
+class QueueBusTest extends FlatSpec with ScalaFutures {
+ implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds)
+
+ def busBehaviour(bus: Bus)(implicit ec: ExecutionContext): Unit = {
+
+ it should "deliver messages to a subscriber" in {
+ val topic = Topic.string("test.topic1")
+ bus.fetchMessages(topic).futureValue
+ bus.publishMessages(topic, Seq("hello world!"))
+ Thread.sleep(100)
+ val messages = bus.fetchMessages(topic)
+ assert(messages.futureValue.map(_.data).toList == List("hello world!"))
+ }
+ }
+
+ implicit val system: ActorSystem = ActorSystem("queue-test")
+ import system.dispatcher
+
+ "A queue-based bus" should behave like busBehaviour(new QueueBus)
+
+}