From e6552f3b31b55396c652c196c5c3a9c3a6cfed71 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 31 Jul 2018 12:13:47 -0700 Subject: Add message bus and topic abstractions (#181) --- build.sbt | 4 +- project/plugins.sbt | 2 +- src/main/scala/xyz/driver/core/Refresh.scala | 56 +++++ src/main/scala/xyz/driver/core/messaging/Bus.scala | 70 ++++++ .../driver/core/messaging/CreateBeforeStream.scala | 32 +++ .../xyz/driver/core/messaging/GoogleBus.scala | 258 +++++++++++++++++++++ .../scala/xyz/driver/core/messaging/QueueBus.scala | 125 ++++++++++ .../xyz/driver/core/messaging/StreamBus.scala | 59 +++++ .../scala/xyz/driver/core/messaging/Topic.scala | 43 ++++ src/main/scala/xyz/driver/core/pubsub.scala | 1 + .../xyz/driver/core/messaging/QueueBusTest.scala | 30 +++ 11 files changed, 678 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/xyz/driver/core/Refresh.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/Bus.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/GoogleBus.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/QueueBus.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/StreamBus.scala create mode 100644 src/main/scala/xyz/driver/core/messaging/Topic.scala create mode 100644 src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala diff --git a/build.sbt b/build.sbt index 0d95a2b..e8d9ba7 100644 --- a/build.sbt +++ b/build.sbt @@ -8,6 +8,8 @@ lazy val core = (project in file(".")) .settings(lintingSettings ++ formatSettings) .settings(libraryDependencies ++= Seq( "xyz.driver" %% "tracing" % "0.1.2", + "com.softwaremill.sttp" %% "core" % "1.2.2", + "com.softwaremill.sttp" %% "akka-http-backend" % "1.2.2", "com.typesafe.akka" %% "akka-actor" % "2.5.13", "com.typesafe.akka" %% "akka-stream" % "2.5.13", "com.typesafe.akka" %% "akka-http-core" % akkaHttpV, @@ -18,6 +20,7 @@ lazy val core = (project in file(".")) "io.kamon" %% "kamon-statsd" % "1.0.0", "io.kamon" %% "kamon-system-metrics" % "1.0.0", "io.kamon" %% "kamon-akka-2.5" % "1.0.0", + "org.scala-lang.modules" %% "scala-async" % "0.9.7", "org.scalatest" %% "scalatest" % "3.0.5" % "test", "org.scalacheck" %% "scalacheck" % "1.14.0" % "test", "org.scalaz" %% "scalaz-core" % "7.2.24", @@ -37,4 +40,3 @@ lazy val core = (project in file(".")) "com.googlecode.libphonenumber" % "libphonenumber" % "8.9.7", "javax.xml.bind" % "jaxb-api" % "2.2.8" )) - .settings(scalaVersion := "2.12.6") diff --git a/project/plugins.sbt b/project/plugins.sbt index 22f8d96..6ff98d8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,3 @@ resolvers += "releases" at "https://drivergrp.jfrog.io/drivergrp/releases" -addSbtPlugin("xyz.driver" % "sbt-settings" % "1.0.11") +addSbtPlugin("xyz.driver" % "sbt-settings" % "1.0.15") diff --git a/src/main/scala/xyz/driver/core/Refresh.scala b/src/main/scala/xyz/driver/core/Refresh.scala new file mode 100644 index 0000000..ca28da7 --- /dev/null +++ b/src/main/scala/xyz/driver/core/Refresh.scala @@ -0,0 +1,56 @@ +package xyz.driver.core + +import java.time.Instant +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.duration.Duration + +/** A single-value asynchronous cache with TTL. + * + * Slightly adapted from Twitter's "util" library + * https://github.com/twitter/util/blob/ae0ab09134414438af9dfaa88a4613cecbff4741/util-cache/src/main/scala/com/twitter/cache/Refresh.scala + * + * Released under the Apache License 2.0. + */ +object Refresh { + + /** Creates a function that will provide a cached value for a given time-to-live (TTL). + * + * {{{ + * def freshToken(): Future[String] = // expensive network call to get an access token + * val getToken: Future[String] = Refresh.every(1.hour)(freshToken()) + * + * getToken() // new token is issued + * getToken() // subsequent calls use the cached token + * // wait 1 hour + * getToken() // new token is issued + * }}} */ + def every[A](ttl: Duration)(compute: => Future[A])(implicit ec: ExecutionContext): () => Future[A] = { + val ref = new AtomicReference[(Future[A], Instant)]( + (Future.failed(new NoSuchElementException("Cached value was never computed")), Instant.MIN) + ) + def refresh(): Future[A] = { + val tuple = ref.get + val (cachedValue, lastRetrieved) = tuple + val now = Instant.now + if (now.getEpochSecond < lastRetrieved.getEpochSecond + ttl.toSeconds) { + cachedValue + } else { + val p = Promise[A] + val nextTuple = (p.future, now) + if (ref.compareAndSet(tuple, nextTuple)) { + compute.onComplete { done => + if (done.isFailure) { + ref.set((p.future, lastRetrieved)) // don't update retrieval time in case of failure + } + p.complete(done) + } + } + refresh() + } + } + refresh _ + } + +} diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala new file mode 100644 index 0000000..e2ee76a --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/Bus.scala @@ -0,0 +1,70 @@ +package xyz.driver.core +package messaging + +import scala.concurrent._ + +/** Base trait for representing message buses. + * + * Message buses are expected to provide "at least once" delivery semantics and are + * expected to retry delivery when a message remains unacknowledged for a reasonable + * amount of time. */ +trait Bus { + + /** Type of unique message identifiers. Usually a string or UUID. */ + type MessageId + + /** Most general kind of message. Any implementation of a message bus must provide + * the fields and methods specified in this trait. */ + trait BasicMessage[A] { + + /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ + def id: MessageId + + /** Actual message content. */ + def data: A + + } + + /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage + * (as that defines the minimal required fields of a messages), but may be refined to + * provide bus-specific additional data. */ + type Message[A] <: BasicMessage[A] + + /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */ + type SubscriptionConfig + + /** Default value for a subscription configuration. It is such that any service will have a unique subscription + * for every topic, shared among all its instances. */ + val defaultSubscriptionConfig: SubscriptionConfig + + /** Maximum amount of messages handled in a single retrieval call. */ + val defaultMaxMessages = 64 + + /** Retrieve any new messages in the mailbox of a subscription. + * + * Any retrieved messages become "outstanding" and should not be returned by this function + * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged. + * In that case, they will again be considered new and will be returned by this function. + * + * Note that although outstanding and acknowledged messages will eventually be removed from + * mailboxes, no guarantee can be made that a message will be delivered only once. */ + def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]] + + /** Acknowledge that a given series of messages has been handled and should + * not be delivered again. + * + * Note that messages become eventually acknowledged and hence may be delivered more than once. + * @see fetchMessages() + */ + def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] + + /** Send a series of messages to a topic. + * + * The returned future will complete once messages have been accepted to the underlying bus. + * No guarantee can be made of their delivery to subscribers. */ + def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] + +} diff --git a/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala b/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala new file mode 100644 index 0000000..bd8e422 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/CreateBeforeStream.scala @@ -0,0 +1,32 @@ +package xyz.driver.core +package messaging + +import akka.stream.scaladsl.{Flow, Source} + +import scala.async.Async.{async, await} +import scala.concurrent.Future + +/** Utility mixin that will ensure that topics and subscriptions exist before attempting to stream from or to them. */ +trait CreateBeforeStream extends StreamBus { + + def createTopic(topic: Topic[_]): Future[Unit] + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] + + override def publish[A](topic: Topic[A]): Flow[A, A, _] = { + def create(): Future[Flow[A, A, _]] = async { + await(createTopic(topic)) + super.publish(topic) + } + Flow.lazyInitAsync[A, A, Any](() => create()) + } + + override def subscribe[A](topic: Topic[A], config: SubscriptionConfig): Source[Message[A], _] = { + def create(): Future[Source[Message[A], _]] = async { + await(createTopic(topic)) + await(createSubscription(topic, config)) + super.subscribe(topic, config) + } + Source.fromFutureSource[Message[A], Any](create()) + } + +} diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala new file mode 100644 index 0000000..9ee5c07 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala @@ -0,0 +1,258 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer +import java.nio.file.{Files, Paths} +import java.security.Signature +import java.time.Instant +import java.util + +import com.google.auth.oauth2.ServiceAccountCredentials +import com.softwaremill.sttp._ +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import scala.async.Async.{async, await} +import scala.concurrent._ +import scala.concurrent.duration._ + +/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]] + * + * == Overview == + * + * The Pub/Sub message system is focused around a few concepts: 'topics', + * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may + * have multiple ''subscriptions'' associated to them. Every subscription to a + * topic will receive all messages sent to the topic. Messages are enqueued in + * a subscription until they are acknowledged by a ''subscriber''. Multiple + * subscribers may be associated to a subscription, in which case messages will + * get delivered arbitrarily among them. + * + * Topics and subscriptions are named resources which can be specified in + * Pub/Sub's configuration and may be queried. Subscribers on the other hand, + * are ephemeral processes that query a subscription on a regular basis, handle any + * messages and acknowledge them. + * + * == Delivery semantics == + * + * - at least once + * - no ordering + * + * == Retention == + * + * - configurable retry delay for unacknowledged messages, defaults to 10s + * - undeliverable messages are kept for 7 days + * + * @param credentials Google cloud credentials, usually the same as used by a + * service. Must have admin access to topics and + * descriptions. + * @param namespace The namespace in which this bus is running. Will be used to + * determine the exact name of topics and subscriptions. + * @param pullTimeout Delay after which a call to fetchMessages() will return an + * empty list, assuming that no messages have been received. + * @param executionContext Execution context to run any blocking commands. + * @param backend sttp backend used to query Pub/Sub's HTTP API + */ +class GoogleBus( + credentials: ServiceAccountCredentials, + namespace: String, + pullTimeout: Duration = 90.seconds +)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) + extends Bus with StreamBus with CreateBeforeStream { + import GoogleBus.Protocol + + case class MessageId(subscription: String, ackId: String) + + case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A] + type Message[A] = PubsubMessage[A] + + /** Subscription-specific configuration + * + * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions. + * All messages sent to a subscription will be dispatched arbitrarily + * among any subscribers. Defaults to the email of the credentials used by this + * bus instance, thereby giving every service a unique subscription to every topic. + * To give every service instance a unique subscription, this must be changed to a + * unique value. + * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again. + */ + case class SubscriptionConfig( + subscriptionPrefix: String = credentials.getClientEmail.split("@")(0), + ackTimeout: FiniteDuration = 10.seconds + ) + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + /** Obtain an authentication token valid for the given duration + * https://developers.google.com/identity/protocols/OAuth2ServiceAccount + */ + private def freshAuthToken(duration: FiniteDuration): Future[String] = { + def jwt = { + val now = Instant.now().getEpochSecond + val base64 = util.Base64.getEncoder + val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) + val body = base64.encodeToString( + s"""|{ + | "iss": "${credentials.getClientEmail}", + | "scope": "https://www.googleapis.com/auth/pubsub", + | "aud": "https://www.googleapis.com/oauth2/v4/token", + | "exp": ${now + duration.toSeconds}, + | "iat": $now + |}""".stripMargin.getBytes("utf-8") + ) + val signer = Signature.getInstance("SHA256withRSA") + signer.initSign(credentials.getPrivateKey) + signer.update(s"$header.$body".getBytes("utf-8")) + val signature = base64.encodeToString(signer.sign()) + s"$header.$body.$signature" + } + sttp + .post(uri"https://www.googleapis.com/oauth2/v4/token") + .body( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> jwt + ) + .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) + .send() + .map(_.unsafeBody) + } + + // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time + private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes)) + + private val baseUri = uri"https://pubsub.googleapis.com/" + + private def rawTopicName(topic: Topic[_]) = + s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}" + + def createTopic(topic: Topic[_]): Future[Unit] = async { + val request = sttp + .put(baseUri.path(s"v1/${rawTopicName(topic)}")) + .auth + .bearer(await(getToken())) + val result = await(request.send()) + result.body match { + case Right(error) if result.code != 409 => // 409 <=> topic already exists + throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: ${error}") + case Left(_) => () + } + } + + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async { + val request = sttp + .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}")) + .auth + .bearer(await(getToken())) + .body( + JsObject( + "topic" -> rawTopicName(topic).toJson, + "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson + ).compactPrint + ) + val result = await(request.send()) + result.body match { + case Right(error) if result.code != 409 => // 409 <=> subscription already exists + throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: ${error}") + case Left(_) => () + } + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { + import Protocol.bufferFormat + val buffers: Seq[ByteBuffer] = messages.map(topic.serialize) + val request = sttp + .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish")) + .auth + .bearer(await(getToken())) + .body( + JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint + ) + await(request.send()).unsafeBody + () + } + + override def fetchMessages[A]( + topic: Topic[A], + subscriptionConfig: SubscriptionConfig, + maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async { + val subscription = rawSubscriptionName(subscriptionConfig, topic) + val request = sttp + .post(baseUri.path(s"v1/$subscription:pull")) + .auth + .bearer(await(getToken().map(x => x))) + .body( + JsObject( + "returnImmediately" -> JsFalse, + "maxMessages" -> JsNumber(maxMessages) + ).compactPrint + ) + .readTimeout(pullTimeout) + .mapResponse(_.parseJson) + + val messages = await(request.send()).unsafeBody match { + case JsObject(fields) if fields.isEmpty => Seq() + case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages + } + + messages.map { msg => + PubsubMessage[A]( + MessageId(subscription, msg.ackId), + topic.deserialize(msg.message.data), + msg.message.publishTime + ) + } + } + + override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async { + val request = sttp + .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge")) + .auth + .bearer(await(getToken())) + .body( + JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint + ) + await(request.send()).unsafeBody + () + } + +} + +object GoogleBus { + + private object Protocol extends DefaultJsonProtocol { + case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage]) + case class ReceivedMessage(ackId: String, message: PubsubMessage) + case class PubsubMessage(data: ByteBuffer, publishTime: Instant) + + implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] { + override def write(obj: Instant): JsValue = JsString(obj.toString) + override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) + } + implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] { + override def write(obj: ByteBuffer): JsValue = + JsString(util.Base64.getEncoder.encodeToString(obj.array())) + + override def read(json: JsValue): ByteBuffer = { + val encodedBytes = json.convertTo[String].getBytes("utf-8") + val decodedBytes = util.Base64.getDecoder.decode(encodedBytes) + ByteBuffer.wrap(decodedBytes) + } + } + + implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage) + implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage) + implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull) + } + + def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = { + def env(key: String) = { + require(sys.env.contains(key), s"Environment variable $key is not set.") + sys.env(key) + } + val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS")) + val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile)) + new GoogleBus(creds, env("SERVICE_NAMESPACE")) + } + +} diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala new file mode 100644 index 0000000..d2ad073 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala @@ -0,0 +1,125 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +import akka.actor.{Actor, ActorSystem, Props} + +import scala.collection.mutable +import scala.collection.mutable.Map +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} + +/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system + * and is intended for testing purposes only. */ +class QueueBus(implicit system: ActorSystem) extends Bus { + import system.dispatcher + + override type SubscriptionConfig = Long + override val defaultSubscriptionConfig: Long = 0 + + override type MessageId = (String, SubscriptionConfig, Long) + type Message[A] = BasicMessage[A] + + private object ActorMessages { + case class Send[A](topic: Topic[A], messages: Seq[A]) + case class Ack(messages: Seq[MessageId]) + case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) + case object Unack + } + + class Subscription { + val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + } + + private class BusMaster extends Actor { + var _id = 0L + def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { + _id += 1; (topic, cfg, _id) + } + + val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty + + def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { + topics.get(topic) match { + case Some(t) => + t.getOrElseUpdate(cfg, new Subscription) + case None => + topics += topic -> mutable.Map.empty + ensureSubscription(topic, cfg) + } + } + + override def preStart(): Unit = { + context.system.scheduler.schedule(1.seconds, 1.seconds) { + self ! ActorMessages.Unack + } + } + + override def receive: Receive = { + case ActorMessages.Send(topic, messages) => + val buffers = messages.map(topic.serialize) + val subscriptions = topics.getOrElse(topic.name, Map.empty) + for ((cfg, subscription) <- subscriptions) { + for (buffer <- buffers) { + subscription.mailbox += nextId(topic.name, cfg) -> buffer + } + } + + case ActorMessages.Fetch(topic, cfg, max, promise) => + ensureSubscription(topic.name, cfg) + val subscription = topics(topic.name)(cfg) + val messages = subscription.mailbox.take(max) + subscription.unacked ++= messages + subscription.mailbox --= messages.map(_._1) + promise.success(messages.toSeq.map { + case (ackId, buffer) => + new Message[Any] { + val id = ackId + val data = topic.deserialize(buffer) + } + }) + + case ActorMessages.Ack(messageIds) => + for (id @ (topic, cfg, _) <- messageIds) { + ensureSubscription(topic, cfg) + val subscription = topics(topic)(cfg) + subscription.unacked -= id + } + + case ActorMessages.Unack => + for ((_, subscriptions) <- topics) { + for ((_, subscription) <- subscriptions) { + subscription.mailbox ++= subscription.unacked + subscription.unacked.clear() + } + } + } + + } + + val actor = system.actorOf(Props(new BusMaster)) + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { + actor ! ActorMessages.Send(topic, messages) + } + + override def fetchMessages[A]( + topic: messaging.Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + val result = Promise[Seq[Message[A]]] + actor ! ActorMessages.Fetch(topic, config, maxMessages, result) + result.future + } + + override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { + actor ! ActorMessages.Ack(ids) + } + +} + +object QueueBus { + def apply(implicit system: ActorSystem) = new QueueBus +} diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala new file mode 100644 index 0000000..aa99960 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -0,0 +1,59 @@ +package xyz.driver.core +package messaging + +import akka.stream.scaladsl.{Flow, Sink, Source} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionContext + +/** An extension to message buses that offers an Akka-Streams API. + * + * Example usage of a stream that subscribes to one topic, prints any messages + * it receives and finally acknowledges them + * their receipt. + * {{{ + * val bus: StreamBus = ??? + * val topic = Topic.string("topic") + * bus.subscribe(topic1) + * .map{ msg => + * print(msg.data) + * msg + * } + * .to(bus.acknowledge) + * .run() + * }}} */ +trait StreamBus extends Bus { + implicit def executionContext: ExecutionContext + + /** Flow that publishes any messages to a given topic. + * Emits messages once have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, _] = { + Flow[A] + .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) + .mapAsync(1) { a => + publishMessages(topic, a).map(_ => a) + } + .mapConcat(list => list.toList) + } + + /** Sink that acknowledges the receipt of a message. */ + def acknowledge: Sink[MessageId, _] = { + Flow[MessageId] + .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) + .mapAsync(1)(acknowledgeMessages(_)) + .to(Sink.ignore) + } + + /** Source that listens to a subscription and receives any messages sent to its topic. */ + def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = { + Source + .unfoldAsync((topic, config))( + topicAndConfig => + fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => + Some(topicAndConfig -> msgs)) + ) + .filter(_.nonEmpty) + .mapConcat(messages => messages.toList) + } + +} diff --git a/src/main/scala/xyz/driver/core/messaging/Topic.scala b/src/main/scala/xyz/driver/core/messaging/Topic.scala new file mode 100644 index 0000000..32fd764 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/Topic.scala @@ -0,0 +1,43 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +/** A topic is a named group of messages that all share a common schema. + * @tparam Message type of messages sent over this topic */ +trait Topic[Message] { + + /** Name of this topic (must be unique). */ + def name: String + + /** Convert a message to its wire format that will be sent over a bus. */ + def serialize(message: Message): ByteBuffer + + /** Convert a message from its wire format. */ + def deserialize(message: ByteBuffer): Message + +} + +object Topic { + + /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */ + def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] { + def name = name0 + override def serialize(message: ByteBuffer): ByteBuffer = message + override def deserialize(message: ByteBuffer): ByteBuffer = message + } + + /** Create a topic that represents data as UTF-8 encoded strings. */ + def string(name0: String): Topic[String] = new Topic[String] { + def name = name0 + override def serialize(message: String): ByteBuffer = { + ByteBuffer.wrap(message.getBytes("utf-8")) + } + override def deserialize(message: ByteBuffer): String = { + val bytes = new Array[Byte](message.remaining()) + message.get(bytes) + new String(bytes, "utf-8") + } + } + +} diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index 6d2667f..34b2140 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -12,6 +12,7 @@ import com.typesafe.scalalogging.Logger import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Try} +@deprecated("Use the message bus implementation in xyz.driver.core.messaging.GoogleBus", "1.11.5") object pubsub { trait PubsubPublisher[Message] { diff --git a/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala b/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala new file mode 100644 index 0000000..8dd0776 --- /dev/null +++ b/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala @@ -0,0 +1,30 @@ +package xyz.driver.core.messaging + +import akka.actor.ActorSystem +import org.scalatest.FlatSpec +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class QueueBusTest extends FlatSpec with ScalaFutures { + implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds) + + def busBehaviour(bus: Bus)(implicit ec: ExecutionContext): Unit = { + + it should "deliver messages to a subscriber" in { + val topic = Topic.string("test.topic1") + bus.fetchMessages(topic).futureValue + bus.publishMessages(topic, Seq("hello world!")) + Thread.sleep(100) + val messages = bus.fetchMessages(topic) + assert(messages.futureValue.map(_.data).toList == List("hello world!")) + } + } + + implicit val system: ActorSystem = ActorSystem("queue-test") + import system.dispatcher + + "A queue-based bus" should behave like busBehaviour(new QueueBus) + +} -- cgit v1.2.3