From 7c755c77afbd67ae2ded9d8b004736d4e27e208f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 16:18:26 -0700 Subject: Move storage and messaging to separate projects --- .../xyz/driver/core/messaging/AliyunBus.scala | 157 ------------ src/main/scala/xyz/driver/core/messaging/Bus.scala | 74 ------ .../xyz/driver/core/messaging/CreateOnDemand.scala | 49 ---- .../xyz/driver/core/messaging/GoogleBus.scala | 267 --------------------- .../scala/xyz/driver/core/messaging/QueueBus.scala | 126 ---------- .../xyz/driver/core/messaging/StreamBus.scala | 102 -------- .../scala/xyz/driver/core/messaging/Topic.scala | 43 ---- .../driver/core/storage/AliyunBlobStorage.scala | 108 --------- .../xyz/driver/core/storage/BlobStorage.scala | 50 ---- .../core/storage/FileSystemBlobStorage.scala | 82 ------- .../xyz/driver/core/storage/GcsBlobStorage.scala | 96 -------- .../xyz/driver/core/storage/channelStreams.scala | 112 --------- 12 files changed, 1266 deletions(-) delete mode 100644 src/main/scala/xyz/driver/core/messaging/AliyunBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/Bus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/GoogleBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/QueueBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/StreamBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/Topic.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/BlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/channelStreams.scala (limited to 'src/main/scala/xyz/driver') diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala deleted file mode 100644 index 8b7bca7..0000000 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ /dev/null @@ -1,157 +0,0 @@ -package xyz.driver.core.messaging -import java.nio.ByteBuffer -import java.util - -import com.aliyun.mns.client.{AsyncCallback, CloudAccount} -import com.aliyun.mns.common.ServiceException -import com.aliyun.mns.model -import com.aliyun.mns.model._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} - -class AliyunBus( - accountId: String, - accessId: String, - accessSecret: String, - region: String, - namespace: String, - pullTimeout: Int -)(implicit val executionContext: ExecutionContext) - extends Bus { - private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" - private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) - private val client = cloudAccount.getMNSClient - - // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the - // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError - // occasionally. We mask both of these errors and return an empty list of messages - private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") - - override val defaultMaxMessages: Int = 10 - - case class MessageId(queueName: String, messageHandle: String) - - case class Message[A](id: MessageId, data: A) extends BasicMessage[A] - - case class SubscriptionConfig( - subscriptionPrefix: String = accessId, - ackTimeout: FiniteDuration = 10.seconds - ) - - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - private def rawTopicName(topic: Topic[_]) = - s"$namespace-${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"$namespace-${config.subscriptionPrefix}-${topic.name}" - - override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - import collection.JavaConverters._ - val subscriptionName = rawSubscriptionName(config, topic) - val queueRef = client.getQueueRef(subscriptionName) - - val promise = Promise[Seq[model.Message]] - queueRef.asyncBatchPopMessage( - maxMessages, - pullTimeout, - new AsyncCallback[util.List[model.Message]] { - override def onSuccess(result: util.List[model.Message]): Unit = { - promise.success(result.asScala) - } - override def onFail(ex: Exception): Unit = ex match { - case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => - promise.success(Nil) - case _ => - promise.failure(ex) - } - } - ) - - promise.future.map(_.map { message => - import scala.xml.XML - val messageId = MessageId(subscriptionName, message.getReceiptHandle) - val messageXML = XML.loadString(message.getMessageBodyAsRawString) - val messageNode = messageXML \ "Message" - val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) - - val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) - Message(messageId, deserializedMessage) - }) - } - - override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { - import collection.JavaConverters._ - require(messages.nonEmpty, "Acknowledged message list must be non-empty") - - val queueRef = client.getQueueRef(messages.head.queueName) - - val promise = Promise[Unit] - queueRef.asyncBatchDeleteMessage( - messages.map(_.messageHandle).asJava, - new AsyncCallback[Void] { - override def onSuccess(result: Void): Unit = promise.success(()) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { - val topicRef = client.getTopicRef(rawTopicName(topic)) - - val publishMessages = messages.map { message => - val promise = Promise[TopicMessage] - - val topicMessage = new Base64TopicMessage - topicMessage.setMessageBody(topic.serialize(message).array()) - - topicRef.asyncPublishMessage( - topicMessage, - new AsyncCallback[TopicMessage] { - override def onSuccess(result: TopicMessage): Unit = promise.success(result) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - Future.sequence(publishMessages).map(_ => ()) - } - - def createTopic(topic: Topic[_]): Future[Unit] = Future { - val topicName = rawTopicName(topic) - val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) - if (!topicExists) { - val topicMeta = new TopicMeta - topicMeta.setTopicName(topicName) - client.createTopic(topicMeta) - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { - val subscriptionName = rawSubscriptionName(config, topic) - val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) - - if (!queueExists) { - val topicName = rawTopicName(topic) - val topicRef = client.getTopicRef(topicName) - - val queueMeta = new QueueMeta - queueMeta.setQueueName(subscriptionName) - queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) - client.createQueue(queueMeta) - - val subscriptionMeta = new SubscriptionMeta - subscriptionMeta.setSubscriptionName(subscriptionName) - subscriptionMeta.setTopicName(topicName) - subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) - topicRef.subscribe(subscriptionMeta) - } - } -} diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala deleted file mode 100644 index 75954f4..0000000 --- a/src/main/scala/xyz/driver/core/messaging/Bus.scala +++ /dev/null @@ -1,74 +0,0 @@ -package xyz.driver.core -package messaging - -import scala.concurrent._ -import scala.language.higherKinds - -/** Base trait for representing message buses. - * - * Message buses are expected to provide "at least once" delivery semantics and are - * expected to retry delivery when a message remains unacknowledged for a reasonable - * amount of time. */ -trait Bus { - - /** Type of unique message identifiers. Usually a string or UUID. */ - type MessageId - - /** Most general kind of message. Any implementation of a message bus must provide - * the fields and methods specified in this trait. */ - trait BasicMessage[A] { self: Message[A] => - - /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ - def id: MessageId - - /** Actual message content. */ - def data: A - - } - - /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage - * (as that defines the minimal required fields of a messages), but may be refined to - * provide bus-specific additional data. */ - type Message[A] <: BasicMessage[A] - - /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */ - type SubscriptionConfig - - /** Default value for a subscription configuration. It is such that any service will have a unique subscription - * for every topic, shared among all its instances. */ - val defaultSubscriptionConfig: SubscriptionConfig - - /** Maximum amount of messages handled in a single retrieval call. */ - val defaultMaxMessages = 64 - - /** Execution context that is used to query and dispatch messages from this bus. */ - implicit val executionContext: ExecutionContext - - /** Retrieve any new messages in the mailbox of a subscription. - * - * Any retrieved messages become "outstanding" and should not be returned by this function - * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged. - * In that case, they will again be considered new and will be returned by this function. - * - * Note that although outstanding and acknowledged messages will eventually be removed from - * mailboxes, no guarantee can be made that a message will be delivered only once. */ - def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]] - - /** Acknowledge that a given series of messages has been handled and should - * not be delivered again. - * - * Note that messages become eventually acknowledged and hence may be delivered more than once. - * @see fetchMessages() - */ - def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] - - /** Send a series of messages to a topic. - * - * The returned future will complete once messages have been accepted to the underlying bus. - * No guarantee can be made of their delivery to subscribers. */ - def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] - -} diff --git a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala deleted file mode 100644 index 1af5308..0000000 --- a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala +++ /dev/null @@ -1,49 +0,0 @@ -package xyz.driver.core -package messaging - -import java.util.concurrent.ConcurrentHashMap - -import scala.async.Async.{async, await} -import scala.concurrent.Future - -/** Utility mixin that will ensure that topics and subscriptions exist before - * attempting to read or write from or to them. - */ -trait CreateOnDemand extends Bus { - - /** Create the given topic. This operation is idempotent and is expected to succeed if the topic - * already exists. - */ - def createTopic(topic: Topic[_]): Future[Unit] - - /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription - * already exists. - */ - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] - - private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]] - private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]] - - private def ensureTopic(topic: Topic[_]) = - createdTopics.computeIfAbsent(topic, t => createTopic(t)) - - private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) = - createdSubscriptions.computeIfAbsent(topic -> config, { - case (t, c) => createSubscription(t, c) - }) - - abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { - await(ensureTopic(topic)) - await(super.publishMessages(topic, messages)) - } - - abstract override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = async { - await(ensureTopic(topic)) - await(ensureSubscription(topic, config)) - await(super.fetchMessages(topic, config, maxMessages)) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala deleted file mode 100644 index b296c50..0000000 --- a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala +++ /dev/null @@ -1,267 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer -import java.nio.file.{Files, Path, Paths} -import java.security.Signature -import java.time.Instant -import java.util - -import com.google.auth.oauth2.ServiceAccountCredentials -import com.softwaremill.sttp._ -import spray.json.DefaultJsonProtocol._ -import spray.json._ - -import scala.async.Async.{async, await} -import scala.concurrent._ -import scala.concurrent.duration._ - -/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]] - * - * == Overview == - * - * The Pub/Sub message system is focused around a few concepts: 'topics', - * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may - * have multiple ''subscriptions'' associated to them. Every subscription to a - * topic will receive all messages sent to the topic. Messages are enqueued in - * a subscription until they are acknowledged by a ''subscriber''. Multiple - * subscribers may be associated to a subscription, in which case messages will - * get delivered arbitrarily among them. - * - * Topics and subscriptions are named resources which can be specified in - * Pub/Sub's configuration and may be queried. Subscribers on the other hand, - * are ephemeral processes that query a subscription on a regular basis, handle any - * messages and acknowledge them. - * - * == Delivery semantics == - * - * - at least once - * - no ordering - * - * == Retention == - * - * - configurable retry delay for unacknowledged messages, defaults to 10s - * - undeliverable messages are kept for 7 days - * - * @param credentials Google cloud credentials, usually the same as used by a - * service. Must have admin access to topics and - * descriptions. - * @param namespace The namespace in which this bus is running. Will be used to - * determine the exact name of topics and subscriptions. - * @param pullTimeout Delay after which a call to fetchMessages() will return an - * empty list, assuming that no messages have been received. - * @param executionContext Execution context to run any blocking commands. - * @param backend sttp backend used to query Pub/Sub's HTTP API - */ -class GoogleBus( - credentials: ServiceAccountCredentials, - namespace: String, - pullTimeout: Duration = 90.seconds -)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) - extends Bus { - import GoogleBus.Protocol - - case class MessageId(subscription: String, ackId: String) - - case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A] - type Message[A] = PubsubMessage[A] - - /** Subscription-specific configuration - * - * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions. - * All messages sent to a subscription will be dispatched arbitrarily - * among any subscribers. Defaults to the email of the credentials used by this - * bus instance, thereby giving every service a unique subscription to every topic. - * To give every service instance a unique subscription, this must be changed to a - * unique value. - * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again. - */ - case class SubscriptionConfig( - subscriptionPrefix: String = credentials.getClientEmail.split("@")(0), - ackTimeout: FiniteDuration = 10.seconds - ) - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - /** Obtain an authentication token valid for the given duration - * https://developers.google.com/identity/protocols/OAuth2ServiceAccount - */ - private def freshAuthToken(duration: FiniteDuration): Future[String] = { - def jwt = { - val now = Instant.now().getEpochSecond - val base64 = util.Base64.getEncoder - val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) - val body = base64.encodeToString( - s"""|{ - | "iss": "${credentials.getClientEmail}", - | "scope": "https://www.googleapis.com/auth/pubsub", - | "aud": "https://www.googleapis.com/oauth2/v4/token", - | "exp": ${now + duration.toSeconds}, - | "iat": $now - |}""".stripMargin.getBytes("utf-8") - ) - val signer = Signature.getInstance("SHA256withRSA") - signer.initSign(credentials.getPrivateKey) - signer.update(s"$header.$body".getBytes("utf-8")) - val signature = base64.encodeToString(signer.sign()) - s"$header.$body.$signature" - } - sttp - .post(uri"https://www.googleapis.com/oauth2/v4/token") - .body( - "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", - "assertion" -> jwt - ) - .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) - .send() - .map(_.unsafeBody) - } - - // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time - private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes)) - - private val baseUri = uri"https://pubsub.googleapis.com/" - - private def rawTopicName(topic: Topic[_]) = - s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}" - - def createTopic(topic: Topic[_]): Future[Unit] = async { - val request = sttp - .put(baseUri.path(s"v1/${rawTopicName(topic)}")) - .auth - .bearer(await(getToken())) - val result = await(request.send()) - result.body match { - case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it - throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error") - case _ => () - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async { - val request = sttp - .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}")) - .auth - .bearer(await(getToken())) - .body( - JsObject( - "topic" -> rawTopicName(topic).toJson, - "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson - ).compactPrint - ) - val result = await(request.send()) - result.body match { - case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it - throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error") - case _ => () - } - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { - import Protocol.bufferFormat - val buffers: Seq[ByteBuffer] = messages.map(topic.serialize) - val request = sttp - .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish")) - .auth - .bearer(await(getToken())) - .body( - JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint - ) - await(request.send()).unsafeBody - () - } - - override def fetchMessages[A]( - topic: Topic[A], - subscriptionConfig: SubscriptionConfig, - maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async { - val subscription = rawSubscriptionName(subscriptionConfig, topic) - val request = sttp - .post(baseUri.path(s"v1/$subscription:pull")) - .auth - .bearer(await(getToken().map(x => x))) - .body( - JsObject( - "returnImmediately" -> JsFalse, - "maxMessages" -> JsNumber(maxMessages) - ).compactPrint - ) - .readTimeout(pullTimeout) - .mapResponse(_.parseJson) - - val messages = await(request.send()).unsafeBody match { - case JsObject(fields) if fields.isEmpty => Seq() - case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages - } - - messages.map { msg => - PubsubMessage[A]( - MessageId(subscription, msg.ackId), - topic.deserialize(msg.message.data), - msg.message.publishTime - ) - } - } - - override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async { - val request = sttp - .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge")) - .auth - .bearer(await(getToken())) - .body( - JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint - ) - await(request.send()).unsafeBody - () - } - -} - -object GoogleBus { - - private object Protocol extends DefaultJsonProtocol { - case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage]) - case class ReceivedMessage(ackId: String, message: PubsubMessage) - case class PubsubMessage(data: ByteBuffer, publishTime: Instant) - - implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] { - override def write(obj: Instant): JsValue = JsString(obj.toString) - override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) - } - implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] { - override def write(obj: ByteBuffer): JsValue = - JsString(util.Base64.getEncoder.encodeToString(obj.array())) - - override def read(json: JsValue): ByteBuffer = { - val encodedBytes = json.convertTo[String].getBytes("utf-8") - val decodedBytes = util.Base64.getDecoder.decode(encodedBytes) - ByteBuffer.wrap(decodedBytes) - } - } - - implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage) - implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage) - implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull) - } - - def fromKeyfile(keyfile: Path, namespace: String)( - implicit executionContext: ExecutionContext, - backend: SttpBackend[Future, _]): GoogleBus = { - val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile)) - new GoogleBus(creds, namespace) - } - - @deprecated( - "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.", - "driver-core 1.12.2") - def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = { - def env(key: String) = { - require(sys.env.contains(key), s"Environment variable $key is not set.") - sys.env(key) - } - val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS")) - fromKeyfile(keyfile, env("SERVICE_NAMESPACE")) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala deleted file mode 100644 index 45c9ed5..0000000 --- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala +++ /dev/null @@ -1,126 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -import akka.actor.{Actor, ActorSystem, Props} - -import scala.collection.mutable -import scala.collection.mutable.Map -import scala.concurrent.duration._ -import scala.concurrent.{Future, Promise} - -/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system - * and is intended for testing purposes only. */ -class QueueBus(implicit system: ActorSystem) extends Bus { - import system.dispatcher - - override type SubscriptionConfig = Long - override val defaultSubscriptionConfig: Long = 0 - override val executionContext = system.dispatcher - - override type MessageId = (String, SubscriptionConfig, Long) - type Message[A] = BasicMessage[A] - - private object ActorMessages { - case class Send[A](topic: Topic[A], messages: Seq[A]) - case class Ack(messages: Seq[MessageId]) - case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) - case object Unack - } - - class Subscription { - val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - } - - private class BusMaster extends Actor { - var _id = 0L - def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { - _id += 1; (topic, cfg, _id) - } - - val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty - - def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { - topics.get(topic) match { - case Some(t) => - t.getOrElseUpdate(cfg, new Subscription) - case None => - topics += topic -> mutable.Map.empty - ensureSubscription(topic, cfg) - } - } - - override def preStart(): Unit = { - context.system.scheduler.schedule(1.seconds, 1.seconds) { - self ! ActorMessages.Unack - } - } - - override def receive: Receive = { - case ActorMessages.Send(topic, messages) => - val buffers = messages.map(topic.serialize) - val subscriptions = topics.getOrElse(topic.name, Map.empty) - for ((cfg, subscription) <- subscriptions) { - for (buffer <- buffers) { - subscription.mailbox += nextId(topic.name, cfg) -> buffer - } - } - - case ActorMessages.Fetch(topic, cfg, max, promise) => - ensureSubscription(topic.name, cfg) - val subscription = topics(topic.name)(cfg) - val messages = subscription.mailbox.take(max) - subscription.unacked ++= messages - subscription.mailbox --= messages.map(_._1) - promise.success(messages.toSeq.map { - case (ackId, buffer) => - new Message[Any] { - val id = ackId - val data = topic.deserialize(buffer) - } - }) - - case ActorMessages.Ack(messageIds) => - for (id @ (topic, cfg, _) <- messageIds) { - ensureSubscription(topic, cfg) - val subscription = topics(topic)(cfg) - subscription.unacked -= id - } - - case ActorMessages.Unack => - for ((_, subscriptions) <- topics) { - for ((_, subscription) <- subscriptions) { - subscription.mailbox ++= subscription.unacked - subscription.unacked.clear() - } - } - } - - } - - val actor = system.actorOf(Props(new BusMaster)) - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { - actor ! ActorMessages.Send(topic, messages) - } - - override def fetchMessages[A]( - topic: messaging.Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - val result = Promise[Seq[Message[A]]] - actor ! ActorMessages.Fetch(topic, config, maxMessages, result) - result.future - } - - override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { - actor ! ActorMessages.Ack(ids) - } - -} - -object QueueBus { - def apply(implicit system: ActorSystem) = new QueueBus -} diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala deleted file mode 100644 index 44d75cd..0000000 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ /dev/null @@ -1,102 +0,0 @@ -package xyz.driver.core -package messaging - -import akka.NotUsed -import akka.stream.Materializer -import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} - -import scala.collection.mutable.ListBuffer -import scala.concurrent.Future -import scala.concurrent.duration._ - -/** An extension to message buses that offers an Akka-Streams API. - * - * Example usage of a stream that subscribes to one topic, prints any messages - * it receives and finally acknowledges them - * their receipt. - * {{{ - * val bus: StreamBus = ??? - * val topic = Topic.string("topic") - * bus.subscribe(topic1) - * .map{ msg => - * print(msg.data) - * msg - * } - * .to(bus.acknowledge) - * .run() - * }}} */ -trait StreamBus extends Bus { - - /** Flow that publishes any messages to a given topic. - * Emits messages once they have been published to the underlying bus. */ - def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { - Flow[A] - .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) - .mapAsync(1) { a => - publishMessages(topic, a).map(_ => a) - } - .mapConcat(list => list.toList) - } - - /** Sink that acknowledges the receipt of a message. */ - def acknowledge: Sink[MessageId, NotUsed] = { - Flow[MessageId] - .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) - .mapAsync(1)(acknowledgeMessages(_)) - .to(Sink.ignore) - } - - /** Source that listens to a subscription and receives any messages sent to its topic. */ - def subscribe[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { - Source - .unfoldAsync((topic, config))( - topicAndConfig => - fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => - Some(topicAndConfig -> msgs)) - ) - .filter(_.nonEmpty) - .mapConcat(messages => messages.toList) - } - - def runWithRestart[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - minBackoff: FiniteDuration = 3.seconds, - maxBackoff: FiniteDuration = 30.seconds, - randomFactor: Double = 0.2, - maxRestarts: Int = 20 - )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { - RestartSource - .withBackoff[MessageId]( - minBackoff, - maxBackoff, - randomFactor, - maxRestarts - ) { () => - subscribe(topic, config) - .via(processMessage) - .log(topic.name) - .mapConcat(identity) - } - .to(acknowledge) - .run() - } - - def handleMessage[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - parallelism: Int = 1, - minBackoff: FiniteDuration = 3.seconds, - maxBackoff: FiniteDuration = 30.seconds, - randomFactor: Double = 0.2, - maxRestarts: Int = 20 - )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { - runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { - Flow[Message[A]].mapAsync(parallelism) { message => - processMessage(message.data).map(_ => message.id :: Nil) - } - } - } -} diff --git a/src/main/scala/xyz/driver/core/messaging/Topic.scala b/src/main/scala/xyz/driver/core/messaging/Topic.scala deleted file mode 100644 index 32fd764..0000000 --- a/src/main/scala/xyz/driver/core/messaging/Topic.scala +++ /dev/null @@ -1,43 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -/** A topic is a named group of messages that all share a common schema. - * @tparam Message type of messages sent over this topic */ -trait Topic[Message] { - - /** Name of this topic (must be unique). */ - def name: String - - /** Convert a message to its wire format that will be sent over a bus. */ - def serialize(message: Message): ByteBuffer - - /** Convert a message from its wire format. */ - def deserialize(message: ByteBuffer): Message - -} - -object Topic { - - /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */ - def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] { - def name = name0 - override def serialize(message: ByteBuffer): ByteBuffer = message - override def deserialize(message: ByteBuffer): ByteBuffer = message - } - - /** Create a topic that represents data as UTF-8 encoded strings. */ - def string(name0: String): Topic[String] = new Topic[String] { - def name = name0 - override def serialize(message: String): ByteBuffer = { - ByteBuffer.wrap(message.getBytes("utf-8")) - } - override def deserialize(message: ByteBuffer): String = { - val bytes = new Array[Byte](message.remaining()) - message.get(bytes) - new String(bytes, "utf-8") - } - } - -} diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala deleted file mode 100644 index b5e8678..0000000 --- a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala +++ /dev/null @@ -1,108 +0,0 @@ -package xyz.driver.core.storage - -import java.io.ByteArrayInputStream -import java.net.URL -import java.nio.file.Path -import java.util.Date - -import akka.Done -import akka.stream.scaladsl.{Sink, Source, StreamConverters} -import akka.util.ByteString -import com.aliyun.oss.OSSClient -import com.aliyun.oss.model.ObjectPermission -import com.typesafe.config.Config -import xyz.driver.core.time.provider.TimeProvider - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class AliyunBlobStorage( - client: OSSClient, - bucketId: String, - timeProvider: TimeProvider, - chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) - extends SignedBlobStorage { - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - client.putObject(bucketId, name, new ByteArrayInputStream(content)) - name - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - client.putObject(bucketId, name, content.toFile) - name - } - - override def exists(name: String): Future[Boolean] = Future { - client.doesObjectExist(bucketId, name) - } - - override def list(prefix: String): Future[Set[String]] = Future { - client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - val inputStream = obj.getObjectContent - Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray - } - } - - override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - StreamConverters - .asInputStream() - .mapMaterializedValue(is => - Future { - client.putObject(bucketId, name, is) - Done - }) - } - - override def delete(name: String): Future[String] = Future { - client.deleteObject(bucketId, name) - name - } - - override def url(name: String): Future[Option[URL]] = Future { - // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm - Option(client.getObjectAcl(bucketId, name)).map { acl => - val isPrivate = acl.getPermission == ObjectPermission.Private - val bucket = client.getBucketInfo(bucketId).getBucket - val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint - new URL(s"https://$bucketId.$endpointUrl/$name") - } - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - if (client.doesObjectExist(bucketId, name)) { - val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis) - Some(client.generatePresignedUrl(bucketId, name, expiration)) - } else { - None - } - } -} - -object AliyunBlobStorage { - val DefaultChunkSize: Int = 8192 - - def apply(config: Config, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val clientId = config.getString("storage.aliyun.clientId") - val clientSecret = config.getString("storage.aliyun.clientSecret") - val endpoint = config.getString("storage.aliyun.endpoint") - this(clientId, clientSecret, endpoint, bucketId, timeProvider) - } - - def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val client = new OSSClient(endpoint, clientId, clientSecret) - new AliyunBlobStorage(client, bucketId, timeProvider) - } -} diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala deleted file mode 100644 index 0cde96a..0000000 --- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala +++ /dev/null @@ -1,50 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.{Sink, Source} -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration.Duration - -/** Binary key-value store, typically implemented by cloud storage. */ -trait BlobStorage { - - /** Upload data by value. */ - def uploadContent(name: String, content: Array[Byte]): Future[String] - - /** Upload data from an existing file. */ - def uploadFile(name: String, content: Path): Future[String] - - def exists(name: String): Future[Boolean] - - /** List available keys. The prefix determines which keys should be listed - * and depends on the implementation (for instance, a file system backed - * blob store will treat a prefix as a directory path). */ - def list(prefix: String): Future[Set[String]] - - /** Get all the content of a given object. */ - def content(name: String): Future[Option[Array[Byte]]] - - /** Stream data asynchronously and with backpressure. */ - def download(name: String): Future[Option[Source[ByteString, Any]]] - - /** Get a sink to upload data. */ - def upload(name: String): Future[Sink[ByteString, Future[Done]]] - - /** Delete a stored value. */ - def delete(name: String): Future[String] - - /** - * Path to specified resource. Checks that the resource exists and returns None if - * it is not found. Depending on the implementation, may throw. - */ - def url(name: String): Future[Option[URL]] -} - -trait SignedBlobStorage extends BlobStorage { - def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] -} diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala deleted file mode 100644 index e12c73d..0000000 --- a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala +++ /dev/null @@ -1,82 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.{Files, Path, StandardCopyOption} - -import akka.stream.scaladsl.{FileIO, Sink, Source} -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} - -/** A blob store that is backed by a local filesystem. All objects are stored relative to the given - * root path. Slashes ('/') in blob names are treated as usual path separators and are converted - * to directories. */ -class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { - - private def ensureParents(file: Path): Path = { - Files.createDirectories(file.getParent()) - file - } - - private def file(name: String) = root.resolve(name) - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - Files.write(ensureParents(file(name)), content) - name - } - override def uploadFile(name: String, content: Path): Future[String] = Future { - Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) - name - } - - override def exists(name: String): Future[Boolean] = Future { - val path = file(name) - Files.exists(path) && Files.isReadable(path) - } - - override def list(prefix: String): Future[Set[String]] = Future { - val dir = file(prefix) - Files - .list(dir) - .iterator() - .asScala - .map(p => root.relativize(p)) - .map(_.toString) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { - case true => - Some(Files.readAllBytes(file(name))) - case false => None - } - - override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { - if (Files.exists(file(name))) { - Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) - } else { - None - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val f = ensureParents(file(name)) - FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) - } - - override def delete(name: String): Future[String] = exists(name).map { e => - if (e) { - Files.delete(file(name)) - } - name - } - - override def url(name: String): Future[Option[URL]] = exists(name) map { - case true => - Some(root.resolve(name).toUri.toURL) - case false => - None - } -} diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala deleted file mode 100644 index 95164c7..0000000 --- a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala +++ /dev/null @@ -1,96 +0,0 @@ -package xyz.driver.core.storage - -import java.io.{FileInputStream, InputStream} -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.Sink -import akka.util.ByteString -import com.google.api.gax.paging.Page -import com.google.auth.oauth2.ServiceAccountCredentials -import com.google.cloud.storage.Storage.BlobListOption -import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( - implicit ec: ExecutionContext) - extends BlobStorage with SignedBlobStorage { - - private val bucket: Bucket = client.get(bucketId) - require(bucket != null, s"Bucket $bucketId does not exist.") - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - bucket.create(name, content).getBlobId.getName - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName - } - - override def exists(name: String): Future[Boolean] = Future { - bucket.get(name) != null - } - - override def list(prefix: String): Future[Set[String]] = Future { - val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) - page - .iterateAll() - .asScala - .map(_.getName()) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(bucket.get(name)).map(blob => blob.getContent()) - } - - override def download(name: String) = Future { - Option(bucket.get(name)).map { blob => - ChannelStream.fromChannel(() => blob.reader(), chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val blob = bucket.create(name, Array.emptyByteArray) - ChannelStream.toChannel(() => blob.writer(), chunkSize) - } - - override def delete(name: String): Future[String] = Future { - client.delete(BlobId.of(bucketId, name)) - name - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) - } - - override def url(name: String): Future[Option[URL]] = Future { - val protocol: String = "https" - val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/" - Option(bucket.get(name)).map { blob => - new URL(protocol, resourcePath, blob.getName) - } - } -} - -object GcsBlobStorage { - final val DefaultChunkSize = 8192 - - private def newClient(key: InputStream): Storage = - StorageOptions - .newBuilder() - .setCredentials(ServiceAccountCredentials.fromStream(key)) - .build() - .getService() - - def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( - implicit ec: ExecutionContext): GcsBlobStorage = { - val client = newClient(new FileInputStream(keyfile.toFile)) - new GcsBlobStorage(client, bucketId, chunkSize) - } - -} diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala deleted file mode 100644 index fc652be..0000000 --- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala +++ /dev/null @@ -1,112 +0,0 @@ -package xyz.driver.core.storage - -import java.nio.ByteBuffer -import java.nio.channels.{ReadableByteChannel, WritableByteChannel} - -import akka.stream._ -import akka.stream.scaladsl.{Sink, Source} -import akka.stream.stage._ -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.concurrent.{Future, Promise} -import scala.util.control.NonFatal - -class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) - extends GraphStage[SourceShape[ByteString]] { - - val out = Outlet[ByteString]("ChannelSource.out") - val shape = SourceShape(out) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends OutHandler { - override def onPull(): Unit = { - try { - val buffer = ByteBuffer.allocate(chunkSize) - if (channel.read(buffer) > 0) { - buffer.flip() - push(out, ByteString.fromByteBuffer(buffer)) - } else { - completeStage() - } - } catch { - case NonFatal(_) => - channel.close() - } - } - override def onDownstreamFinish(): Unit = { - channel.close() - } - } - - setHandler(out, Handler) - } - -} - -class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) - extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { - - val in = Inlet[ByteString]("ChannelSink.in") - val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { - val promise = Promise[Done]() - val logic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends InHandler { - override def onPush(): Unit = { - try { - val data = grab(in) - channel.write(data.asByteBuffer) - pull(in) - } catch { - case NonFatal(e) => - channel.close() - promise.failure(e) - } - } - - override def onUpstreamFinish(): Unit = { - channel.close() - completeStage() - promise.success(Done) - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - channel.close() - promise.failure(ex) - } - } - - setHandler(in, Handler) - - override def preStart(): Unit = { - pull(in) - } - } - (logic, promise.future) - } - -} - -object ChannelStream { - - def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { - Source - .fromGraph(new ChannelSource(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - - def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { - Sink - .fromGraph(new ChannelSink(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - -} -- cgit v1.2.3