From 7c755c77afbd67ae2ded9d8b004736d4e27e208f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 16:18:26 -0700 Subject: Move storage and messaging to separate projects --- .../xyz/driver/core/messaging/AliyunBus.scala | 157 ++++++++++++ .../main/scala/xyz/driver/core/messaging/Bus.scala | 74 ++++++ .../xyz/driver/core/messaging/CreateOnDemand.scala | 49 ++++ .../xyz/driver/core/messaging/GoogleBus.scala | 267 +++++++++++++++++++++ .../scala/xyz/driver/core/messaging/QueueBus.scala | 126 ++++++++++ .../xyz/driver/core/messaging/StreamBus.scala | 102 ++++++++ .../scala/xyz/driver/core/messaging/Topic.scala | 43 ++++ .../xyz/driver/core/messaging/QueueBusTest.scala | 30 +++ .../driver/core/storage/AliyunBlobStorage.scala | 108 +++++++++ .../xyz/driver/core/storage/BlobStorage.scala | 50 ++++ .../core/storage/FileSystemBlobStorage.scala | 82 +++++++ .../xyz/driver/core/storage/GcsBlobStorage.scala | 96 ++++++++ .../xyz/driver/core/storage/channelStreams.scala | 112 +++++++++ .../scala/xyz/driver/core/BlobStorageTest.scala | 94 ++++++++ .../xyz/driver/core/messaging/AliyunBus.scala | 157 ------------ src/main/scala/xyz/driver/core/messaging/Bus.scala | 74 ------ .../xyz/driver/core/messaging/CreateOnDemand.scala | 49 ---- .../xyz/driver/core/messaging/GoogleBus.scala | 267 --------------------- .../scala/xyz/driver/core/messaging/QueueBus.scala | 126 ---------- .../xyz/driver/core/messaging/StreamBus.scala | 102 -------- .../scala/xyz/driver/core/messaging/Topic.scala | 43 ---- .../driver/core/storage/AliyunBlobStorage.scala | 108 --------- .../xyz/driver/core/storage/BlobStorage.scala | 50 ---- .../core/storage/FileSystemBlobStorage.scala | 82 ------- .../xyz/driver/core/storage/GcsBlobStorage.scala | 96 -------- .../xyz/driver/core/storage/channelStreams.scala | 112 --------- .../scala/xyz/driver/core/BlobStorageTest.scala | 94 -------- .../xyz/driver/core/messaging/QueueBusTest.scala | 30 --- 28 files changed, 1390 insertions(+), 1390 deletions(-) create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala create mode 100644 core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala create mode 100644 core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala create mode 100644 core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/AliyunBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/Bus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/GoogleBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/QueueBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/StreamBus.scala delete mode 100644 src/main/scala/xyz/driver/core/messaging/Topic.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/BlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala delete mode 100644 src/main/scala/xyz/driver/core/storage/channelStreams.scala delete mode 100644 src/test/scala/xyz/driver/core/BlobStorageTest.scala delete mode 100644 src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala new file mode 100644 index 0000000..8b7bca7 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -0,0 +1,157 @@ +package xyz.driver.core.messaging +import java.nio.ByteBuffer +import java.util + +import com.aliyun.mns.client.{AsyncCallback, CloudAccount} +import com.aliyun.mns.common.ServiceException +import com.aliyun.mns.model +import com.aliyun.mns.model._ + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} + +class AliyunBus( + accountId: String, + accessId: String, + accessSecret: String, + region: String, + namespace: String, + pullTimeout: Int +)(implicit val executionContext: ExecutionContext) + extends Bus { + private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" + private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) + private val client = cloudAccount.getMNSClient + + // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the + // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError + // occasionally. We mask both of these errors and return an empty list of messages + private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") + + override val defaultMaxMessages: Int = 10 + + case class MessageId(queueName: String, messageHandle: String) + + case class Message[A](id: MessageId, data: A) extends BasicMessage[A] + + case class SubscriptionConfig( + subscriptionPrefix: String = accessId, + ackTimeout: FiniteDuration = 10.seconds + ) + + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + private def rawTopicName(topic: Topic[_]) = + s"$namespace-${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"$namespace-${config.subscriptionPrefix}-${topic.name}" + + override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + import collection.JavaConverters._ + val subscriptionName = rawSubscriptionName(config, topic) + val queueRef = client.getQueueRef(subscriptionName) + + val promise = Promise[Seq[model.Message]] + queueRef.asyncBatchPopMessage( + maxMessages, + pullTimeout, + new AsyncCallback[util.List[model.Message]] { + override def onSuccess(result: util.List[model.Message]): Unit = { + promise.success(result.asScala) + } + override def onFail(ex: Exception): Unit = ex match { + case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => + promise.success(Nil) + case _ => + promise.failure(ex) + } + } + ) + + promise.future.map(_.map { message => + import scala.xml.XML + val messageId = MessageId(subscriptionName, message.getReceiptHandle) + val messageXML = XML.loadString(message.getMessageBodyAsRawString) + val messageNode = messageXML \ "Message" + val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) + + val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) + Message(messageId, deserializedMessage) + }) + } + + override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { + import collection.JavaConverters._ + require(messages.nonEmpty, "Acknowledged message list must be non-empty") + + val queueRef = client.getQueueRef(messages.head.queueName) + + val promise = Promise[Unit] + queueRef.asyncBatchDeleteMessage( + messages.map(_.messageHandle).asJava, + new AsyncCallback[Void] { + override def onSuccess(result: Void): Unit = promise.success(()) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { + val topicRef = client.getTopicRef(rawTopicName(topic)) + + val publishMessages = messages.map { message => + val promise = Promise[TopicMessage] + + val topicMessage = new Base64TopicMessage + topicMessage.setMessageBody(topic.serialize(message).array()) + + topicRef.asyncPublishMessage( + topicMessage, + new AsyncCallback[TopicMessage] { + override def onSuccess(result: TopicMessage): Unit = promise.success(result) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + Future.sequence(publishMessages).map(_ => ()) + } + + def createTopic(topic: Topic[_]): Future[Unit] = Future { + val topicName = rawTopicName(topic) + val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) + if (!topicExists) { + val topicMeta = new TopicMeta + topicMeta.setTopicName(topicName) + client.createTopic(topicMeta) + } + } + + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { + val subscriptionName = rawSubscriptionName(config, topic) + val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) + + if (!queueExists) { + val topicName = rawTopicName(topic) + val topicRef = client.getTopicRef(topicName) + + val queueMeta = new QueueMeta + queueMeta.setQueueName(subscriptionName) + queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) + client.createQueue(queueMeta) + + val subscriptionMeta = new SubscriptionMeta + subscriptionMeta.setSubscriptionName(subscriptionName) + subscriptionMeta.setTopicName(topicName) + subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) + topicRef.subscribe(subscriptionMeta) + } + } +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala new file mode 100644 index 0000000..75954f4 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/Bus.scala @@ -0,0 +1,74 @@ +package xyz.driver.core +package messaging + +import scala.concurrent._ +import scala.language.higherKinds + +/** Base trait for representing message buses. + * + * Message buses are expected to provide "at least once" delivery semantics and are + * expected to retry delivery when a message remains unacknowledged for a reasonable + * amount of time. */ +trait Bus { + + /** Type of unique message identifiers. Usually a string or UUID. */ + type MessageId + + /** Most general kind of message. Any implementation of a message bus must provide + * the fields and methods specified in this trait. */ + trait BasicMessage[A] { self: Message[A] => + + /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ + def id: MessageId + + /** Actual message content. */ + def data: A + + } + + /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage + * (as that defines the minimal required fields of a messages), but may be refined to + * provide bus-specific additional data. */ + type Message[A] <: BasicMessage[A] + + /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */ + type SubscriptionConfig + + /** Default value for a subscription configuration. It is such that any service will have a unique subscription + * for every topic, shared among all its instances. */ + val defaultSubscriptionConfig: SubscriptionConfig + + /** Maximum amount of messages handled in a single retrieval call. */ + val defaultMaxMessages = 64 + + /** Execution context that is used to query and dispatch messages from this bus. */ + implicit val executionContext: ExecutionContext + + /** Retrieve any new messages in the mailbox of a subscription. + * + * Any retrieved messages become "outstanding" and should not be returned by this function + * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged. + * In that case, they will again be considered new and will be returned by this function. + * + * Note that although outstanding and acknowledged messages will eventually be removed from + * mailboxes, no guarantee can be made that a message will be delivered only once. */ + def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]] + + /** Acknowledge that a given series of messages has been handled and should + * not be delivered again. + * + * Note that messages become eventually acknowledged and hence may be delivered more than once. + * @see fetchMessages() + */ + def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] + + /** Send a series of messages to a topic. + * + * The returned future will complete once messages have been accepted to the underlying bus. + * No guarantee can be made of their delivery to subscribers. */ + def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] + +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala new file mode 100644 index 0000000..1af5308 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala @@ -0,0 +1,49 @@ +package xyz.driver.core +package messaging + +import java.util.concurrent.ConcurrentHashMap + +import scala.async.Async.{async, await} +import scala.concurrent.Future + +/** Utility mixin that will ensure that topics and subscriptions exist before + * attempting to read or write from or to them. + */ +trait CreateOnDemand extends Bus { + + /** Create the given topic. This operation is idempotent and is expected to succeed if the topic + * already exists. + */ + def createTopic(topic: Topic[_]): Future[Unit] + + /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription + * already exists. + */ + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] + + private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]] + private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]] + + private def ensureTopic(topic: Topic[_]) = + createdTopics.computeIfAbsent(topic, t => createTopic(t)) + + private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) = + createdSubscriptions.computeIfAbsent(topic -> config, { + case (t, c) => createSubscription(t, c) + }) + + abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { + await(ensureTopic(topic)) + await(super.publishMessages(topic, messages)) + } + + abstract override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = async { + await(ensureTopic(topic)) + await(ensureSubscription(topic, config)) + await(super.fetchMessages(topic, config, maxMessages)) + } + +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala new file mode 100644 index 0000000..b296c50 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala @@ -0,0 +1,267 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer +import java.nio.file.{Files, Path, Paths} +import java.security.Signature +import java.time.Instant +import java.util + +import com.google.auth.oauth2.ServiceAccountCredentials +import com.softwaremill.sttp._ +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import scala.async.Async.{async, await} +import scala.concurrent._ +import scala.concurrent.duration._ + +/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]] + * + * == Overview == + * + * The Pub/Sub message system is focused around a few concepts: 'topics', + * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may + * have multiple ''subscriptions'' associated to them. Every subscription to a + * topic will receive all messages sent to the topic. Messages are enqueued in + * a subscription until they are acknowledged by a ''subscriber''. Multiple + * subscribers may be associated to a subscription, in which case messages will + * get delivered arbitrarily among them. + * + * Topics and subscriptions are named resources which can be specified in + * Pub/Sub's configuration and may be queried. Subscribers on the other hand, + * are ephemeral processes that query a subscription on a regular basis, handle any + * messages and acknowledge them. + * + * == Delivery semantics == + * + * - at least once + * - no ordering + * + * == Retention == + * + * - configurable retry delay for unacknowledged messages, defaults to 10s + * - undeliverable messages are kept for 7 days + * + * @param credentials Google cloud credentials, usually the same as used by a + * service. Must have admin access to topics and + * descriptions. + * @param namespace The namespace in which this bus is running. Will be used to + * determine the exact name of topics and subscriptions. + * @param pullTimeout Delay after which a call to fetchMessages() will return an + * empty list, assuming that no messages have been received. + * @param executionContext Execution context to run any blocking commands. + * @param backend sttp backend used to query Pub/Sub's HTTP API + */ +class GoogleBus( + credentials: ServiceAccountCredentials, + namespace: String, + pullTimeout: Duration = 90.seconds +)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) + extends Bus { + import GoogleBus.Protocol + + case class MessageId(subscription: String, ackId: String) + + case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A] + type Message[A] = PubsubMessage[A] + + /** Subscription-specific configuration + * + * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions. + * All messages sent to a subscription will be dispatched arbitrarily + * among any subscribers. Defaults to the email of the credentials used by this + * bus instance, thereby giving every service a unique subscription to every topic. + * To give every service instance a unique subscription, this must be changed to a + * unique value. + * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again. + */ + case class SubscriptionConfig( + subscriptionPrefix: String = credentials.getClientEmail.split("@")(0), + ackTimeout: FiniteDuration = 10.seconds + ) + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + /** Obtain an authentication token valid for the given duration + * https://developers.google.com/identity/protocols/OAuth2ServiceAccount + */ + private def freshAuthToken(duration: FiniteDuration): Future[String] = { + def jwt = { + val now = Instant.now().getEpochSecond + val base64 = util.Base64.getEncoder + val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) + val body = base64.encodeToString( + s"""|{ + | "iss": "${credentials.getClientEmail}", + | "scope": "https://www.googleapis.com/auth/pubsub", + | "aud": "https://www.googleapis.com/oauth2/v4/token", + | "exp": ${now + duration.toSeconds}, + | "iat": $now + |}""".stripMargin.getBytes("utf-8") + ) + val signer = Signature.getInstance("SHA256withRSA") + signer.initSign(credentials.getPrivateKey) + signer.update(s"$header.$body".getBytes("utf-8")) + val signature = base64.encodeToString(signer.sign()) + s"$header.$body.$signature" + } + sttp + .post(uri"https://www.googleapis.com/oauth2/v4/token") + .body( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> jwt + ) + .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) + .send() + .map(_.unsafeBody) + } + + // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time + private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes)) + + private val baseUri = uri"https://pubsub.googleapis.com/" + + private def rawTopicName(topic: Topic[_]) = + s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}" + + def createTopic(topic: Topic[_]): Future[Unit] = async { + val request = sttp + .put(baseUri.path(s"v1/${rawTopicName(topic)}")) + .auth + .bearer(await(getToken())) + val result = await(request.send()) + result.body match { + case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it + throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error") + case _ => () + } + } + + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async { + val request = sttp + .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}")) + .auth + .bearer(await(getToken())) + .body( + JsObject( + "topic" -> rawTopicName(topic).toJson, + "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson + ).compactPrint + ) + val result = await(request.send()) + result.body match { + case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it + throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error") + case _ => () + } + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { + import Protocol.bufferFormat + val buffers: Seq[ByteBuffer] = messages.map(topic.serialize) + val request = sttp + .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish")) + .auth + .bearer(await(getToken())) + .body( + JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint + ) + await(request.send()).unsafeBody + () + } + + override def fetchMessages[A]( + topic: Topic[A], + subscriptionConfig: SubscriptionConfig, + maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async { + val subscription = rawSubscriptionName(subscriptionConfig, topic) + val request = sttp + .post(baseUri.path(s"v1/$subscription:pull")) + .auth + .bearer(await(getToken().map(x => x))) + .body( + JsObject( + "returnImmediately" -> JsFalse, + "maxMessages" -> JsNumber(maxMessages) + ).compactPrint + ) + .readTimeout(pullTimeout) + .mapResponse(_.parseJson) + + val messages = await(request.send()).unsafeBody match { + case JsObject(fields) if fields.isEmpty => Seq() + case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages + } + + messages.map { msg => + PubsubMessage[A]( + MessageId(subscription, msg.ackId), + topic.deserialize(msg.message.data), + msg.message.publishTime + ) + } + } + + override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async { + val request = sttp + .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge")) + .auth + .bearer(await(getToken())) + .body( + JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint + ) + await(request.send()).unsafeBody + () + } + +} + +object GoogleBus { + + private object Protocol extends DefaultJsonProtocol { + case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage]) + case class ReceivedMessage(ackId: String, message: PubsubMessage) + case class PubsubMessage(data: ByteBuffer, publishTime: Instant) + + implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] { + override def write(obj: Instant): JsValue = JsString(obj.toString) + override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) + } + implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] { + override def write(obj: ByteBuffer): JsValue = + JsString(util.Base64.getEncoder.encodeToString(obj.array())) + + override def read(json: JsValue): ByteBuffer = { + val encodedBytes = json.convertTo[String].getBytes("utf-8") + val decodedBytes = util.Base64.getDecoder.decode(encodedBytes) + ByteBuffer.wrap(decodedBytes) + } + } + + implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage) + implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage) + implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull) + } + + def fromKeyfile(keyfile: Path, namespace: String)( + implicit executionContext: ExecutionContext, + backend: SttpBackend[Future, _]): GoogleBus = { + val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile)) + new GoogleBus(creds, namespace) + } + + @deprecated( + "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.", + "driver-core 1.12.2") + def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = { + def env(key: String) = { + require(sys.env.contains(key), s"Environment variable $key is not set.") + sys.env(key) + } + val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS")) + fromKeyfile(keyfile, env("SERVICE_NAMESPACE")) + } + +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala new file mode 100644 index 0000000..45c9ed5 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/QueueBus.scala @@ -0,0 +1,126 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +import akka.actor.{Actor, ActorSystem, Props} + +import scala.collection.mutable +import scala.collection.mutable.Map +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} + +/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system + * and is intended for testing purposes only. */ +class QueueBus(implicit system: ActorSystem) extends Bus { + import system.dispatcher + + override type SubscriptionConfig = Long + override val defaultSubscriptionConfig: Long = 0 + override val executionContext = system.dispatcher + + override type MessageId = (String, SubscriptionConfig, Long) + type Message[A] = BasicMessage[A] + + private object ActorMessages { + case class Send[A](topic: Topic[A], messages: Seq[A]) + case class Ack(messages: Seq[MessageId]) + case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) + case object Unack + } + + class Subscription { + val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty + } + + private class BusMaster extends Actor { + var _id = 0L + def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { + _id += 1; (topic, cfg, _id) + } + + val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty + + def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { + topics.get(topic) match { + case Some(t) => + t.getOrElseUpdate(cfg, new Subscription) + case None => + topics += topic -> mutable.Map.empty + ensureSubscription(topic, cfg) + } + } + + override def preStart(): Unit = { + context.system.scheduler.schedule(1.seconds, 1.seconds) { + self ! ActorMessages.Unack + } + } + + override def receive: Receive = { + case ActorMessages.Send(topic, messages) => + val buffers = messages.map(topic.serialize) + val subscriptions = topics.getOrElse(topic.name, Map.empty) + for ((cfg, subscription) <- subscriptions) { + for (buffer <- buffers) { + subscription.mailbox += nextId(topic.name, cfg) -> buffer + } + } + + case ActorMessages.Fetch(topic, cfg, max, promise) => + ensureSubscription(topic.name, cfg) + val subscription = topics(topic.name)(cfg) + val messages = subscription.mailbox.take(max) + subscription.unacked ++= messages + subscription.mailbox --= messages.map(_._1) + promise.success(messages.toSeq.map { + case (ackId, buffer) => + new Message[Any] { + val id = ackId + val data = topic.deserialize(buffer) + } + }) + + case ActorMessages.Ack(messageIds) => + for (id @ (topic, cfg, _) <- messageIds) { + ensureSubscription(topic, cfg) + val subscription = topics(topic)(cfg) + subscription.unacked -= id + } + + case ActorMessages.Unack => + for ((_, subscriptions) <- topics) { + for ((_, subscription) <- subscriptions) { + subscription.mailbox ++= subscription.unacked + subscription.unacked.clear() + } + } + } + + } + + val actor = system.actorOf(Props(new BusMaster)) + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { + actor ! ActorMessages.Send(topic, messages) + } + + override def fetchMessages[A]( + topic: messaging.Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + val result = Promise[Seq[Message[A]]] + actor ! ActorMessages.Fetch(topic, config, maxMessages, result) + result.future + } + + override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { + actor ! ActorMessages.Ack(ids) + } + +} + +object QueueBus { + def apply(implicit system: ActorSystem) = new QueueBus +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala new file mode 100644 index 0000000..44d75cd --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -0,0 +1,102 @@ +package xyz.driver.core +package messaging + +import akka.NotUsed +import akka.stream.Materializer +import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** An extension to message buses that offers an Akka-Streams API. + * + * Example usage of a stream that subscribes to one topic, prints any messages + * it receives and finally acknowledges them + * their receipt. + * {{{ + * val bus: StreamBus = ??? + * val topic = Topic.string("topic") + * bus.subscribe(topic1) + * .map{ msg => + * print(msg.data) + * msg + * } + * .to(bus.acknowledge) + * .run() + * }}} */ +trait StreamBus extends Bus { + + /** Flow that publishes any messages to a given topic. + * Emits messages once they have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { + Flow[A] + .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) + .mapAsync(1) { a => + publishMessages(topic, a).map(_ => a) + } + .mapConcat(list => list.toList) + } + + /** Sink that acknowledges the receipt of a message. */ + def acknowledge: Sink[MessageId, NotUsed] = { + Flow[MessageId] + .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) + .mapAsync(1)(acknowledgeMessages(_)) + .to(Sink.ignore) + } + + /** Source that listens to a subscription and receives any messages sent to its topic. */ + def subscribe[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { + Source + .unfoldAsync((topic, config))( + topicAndConfig => + fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => + Some(topicAndConfig -> msgs)) + ) + .filter(_.nonEmpty) + .mapConcat(messages => messages.toList) + } + + def runWithRestart[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { + RestartSource + .withBackoff[MessageId]( + minBackoff, + maxBackoff, + randomFactor, + maxRestarts + ) { () => + subscribe(topic, config) + .via(processMessage) + .log(topic.name) + .mapConcat(identity) + } + .to(acknowledge) + .run() + } + + def handleMessage[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig, + parallelism: Int = 1, + minBackoff: FiniteDuration = 3.seconds, + maxBackoff: FiniteDuration = 30.seconds, + randomFactor: Double = 0.2, + maxRestarts: Int = 20 + )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { + runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { + Flow[Message[A]].mapAsync(parallelism) { message => + processMessage(message.data).map(_ => message.id :: Nil) + } + } + } +} diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala new file mode 100644 index 0000000..32fd764 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/Topic.scala @@ -0,0 +1,43 @@ +package xyz.driver.core +package messaging + +import java.nio.ByteBuffer + +/** A topic is a named group of messages that all share a common schema. + * @tparam Message type of messages sent over this topic */ +trait Topic[Message] { + + /** Name of this topic (must be unique). */ + def name: String + + /** Convert a message to its wire format that will be sent over a bus. */ + def serialize(message: Message): ByteBuffer + + /** Convert a message from its wire format. */ + def deserialize(message: ByteBuffer): Message + +} + +object Topic { + + /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */ + def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] { + def name = name0 + override def serialize(message: ByteBuffer): ByteBuffer = message + override def deserialize(message: ByteBuffer): ByteBuffer = message + } + + /** Create a topic that represents data as UTF-8 encoded strings. */ + def string(name0: String): Topic[String] = new Topic[String] { + def name = name0 + override def serialize(message: String): ByteBuffer = { + ByteBuffer.wrap(message.getBytes("utf-8")) + } + override def deserialize(message: ByteBuffer): String = { + val bytes = new Array[Byte](message.remaining()) + message.get(bytes) + new String(bytes, "utf-8") + } + } + +} diff --git a/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala b/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala new file mode 100644 index 0000000..8dd0776 --- /dev/null +++ b/core-messaging/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala @@ -0,0 +1,30 @@ +package xyz.driver.core.messaging + +import akka.actor.ActorSystem +import org.scalatest.FlatSpec +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class QueueBusTest extends FlatSpec with ScalaFutures { + implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds) + + def busBehaviour(bus: Bus)(implicit ec: ExecutionContext): Unit = { + + it should "deliver messages to a subscriber" in { + val topic = Topic.string("test.topic1") + bus.fetchMessages(topic).futureValue + bus.publishMessages(topic, Seq("hello world!")) + Thread.sleep(100) + val messages = bus.fetchMessages(topic) + assert(messages.futureValue.map(_.data).toList == List("hello world!")) + } + } + + implicit val system: ActorSystem = ActorSystem("queue-test") + import system.dispatcher + + "A queue-based bus" should behave like busBehaviour(new QueueBus) + +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala new file mode 100644 index 0000000..7e59df4 --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala @@ -0,0 +1,108 @@ +package xyz.driver.core.storage + +import java.io.ByteArrayInputStream +import java.net.URL +import java.nio.file.Path +import java.time.Clock +import java.util.Date + +import akka.Done +import akka.stream.scaladsl.{Sink, Source, StreamConverters} +import akka.util.ByteString +import com.aliyun.oss.OSSClient +import com.aliyun.oss.model.ObjectPermission +import com.typesafe.config.Config + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} + +class AliyunBlobStorage( + client: OSSClient, + bucketId: String, + clock: Clock, + chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) + extends SignedBlobStorage { + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + client.putObject(bucketId, name, new ByteArrayInputStream(content)) + name + } + + override def uploadFile(name: String, content: Path): Future[String] = Future { + client.putObject(bucketId, name, content.toFile) + name + } + + override def exists(name: String): Future[Boolean] = Future { + client.doesObjectExist(bucketId, name) + } + + override def list(prefix: String): Future[Set[String]] = Future { + client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) + } + + override def content(name: String): Future[Option[Array[Byte]]] = Future { + Option(client.getObject(bucketId, name)).map { obj => + val inputStream = obj.getObjectContent + Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray + } + } + + override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { + Option(client.getObject(bucketId, name)).map { obj => + StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + StreamConverters + .asInputStream() + .mapMaterializedValue(is => + Future { + client.putObject(bucketId, name, is) + Done + }) + } + + override def delete(name: String): Future[String] = Future { + client.deleteObject(bucketId, name) + name + } + + override def url(name: String): Future[Option[URL]] = Future { + // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm + Option(client.getObjectAcl(bucketId, name)).map { acl => + val isPrivate = acl.getPermission == ObjectPermission.Private + val bucket = client.getBucketInfo(bucketId).getBucket + val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint + new URL(s"https://$bucketId.$endpointUrl/$name") + } + } + + override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { + if (client.doesObjectExist(bucketId, name)) { + val expiration = new Date(clock.millis() + duration.toMillis) + Some(client.generatePresignedUrl(bucketId, name, expiration)) + } else { + None + } + } +} + +object AliyunBlobStorage { + val DefaultChunkSize: Int = 8192 + + def apply(config: Config, bucketId: String, clock: Clock)( + implicit ec: ExecutionContext): AliyunBlobStorage = { + val clientId = config.getString("storage.aliyun.clientId") + val clientSecret = config.getString("storage.aliyun.clientSecret") + val endpoint = config.getString("storage.aliyun.endpoint") + this(clientId, clientSecret, endpoint, bucketId, clock) + } + + def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, clock: Clock)( + implicit ec: ExecutionContext): AliyunBlobStorage = { + val client = new OSSClient(endpoint, clientId, clientSecret) + new AliyunBlobStorage(client, bucketId, clock) + } +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala new file mode 100644 index 0000000..0cde96a --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala @@ -0,0 +1,50 @@ +package xyz.driver.core.storage + +import java.net.URL +import java.nio.file.Path + +import akka.Done +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString + +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +/** Binary key-value store, typically implemented by cloud storage. */ +trait BlobStorage { + + /** Upload data by value. */ + def uploadContent(name: String, content: Array[Byte]): Future[String] + + /** Upload data from an existing file. */ + def uploadFile(name: String, content: Path): Future[String] + + def exists(name: String): Future[Boolean] + + /** List available keys. The prefix determines which keys should be listed + * and depends on the implementation (for instance, a file system backed + * blob store will treat a prefix as a directory path). */ + def list(prefix: String): Future[Set[String]] + + /** Get all the content of a given object. */ + def content(name: String): Future[Option[Array[Byte]]] + + /** Stream data asynchronously and with backpressure. */ + def download(name: String): Future[Option[Source[ByteString, Any]]] + + /** Get a sink to upload data. */ + def upload(name: String): Future[Sink[ByteString, Future[Done]]] + + /** Delete a stored value. */ + def delete(name: String): Future[String] + + /** + * Path to specified resource. Checks that the resource exists and returns None if + * it is not found. Depending on the implementation, may throw. + */ + def url(name: String): Future[Option[URL]] +} + +trait SignedBlobStorage extends BlobStorage { + def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala new file mode 100644 index 0000000..e12c73d --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala @@ -0,0 +1,82 @@ +package xyz.driver.core.storage + +import java.net.URL +import java.nio.file.{Files, Path, StandardCopyOption} + +import akka.stream.scaladsl.{FileIO, Sink, Source} +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +/** A blob store that is backed by a local filesystem. All objects are stored relative to the given + * root path. Slashes ('/') in blob names are treated as usual path separators and are converted + * to directories. */ +class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { + + private def ensureParents(file: Path): Path = { + Files.createDirectories(file.getParent()) + file + } + + private def file(name: String) = root.resolve(name) + + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + Files.write(ensureParents(file(name)), content) + name + } + override def uploadFile(name: String, content: Path): Future[String] = Future { + Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) + name + } + + override def exists(name: String): Future[Boolean] = Future { + val path = file(name) + Files.exists(path) && Files.isReadable(path) + } + + override def list(prefix: String): Future[Set[String]] = Future { + val dir = file(prefix) + Files + .list(dir) + .iterator() + .asScala + .map(p => root.relativize(p)) + .map(_.toString) + .toSet + } + + override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { + case true => + Some(Files.readAllBytes(file(name))) + case false => None + } + + override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { + if (Files.exists(file(name))) { + Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) + } else { + None + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + val f = ensureParents(file(name)) + FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) + } + + override def delete(name: String): Future[String] = exists(name).map { e => + if (e) { + Files.delete(file(name)) + } + name + } + + override def url(name: String): Future[Option[URL]] = exists(name) map { + case true => + Some(root.resolve(name).toUri.toURL) + case false => + None + } +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala new file mode 100644 index 0000000..95164c7 --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala @@ -0,0 +1,96 @@ +package xyz.driver.core.storage + +import java.io.{FileInputStream, InputStream} +import java.net.URL +import java.nio.file.Path + +import akka.Done +import akka.stream.scaladsl.Sink +import akka.util.ByteString +import com.google.api.gax.paging.Page +import com.google.auth.oauth2.ServiceAccountCredentials +import com.google.cloud.storage.Storage.BlobListOption +import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} + +class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( + implicit ec: ExecutionContext) + extends BlobStorage with SignedBlobStorage { + + private val bucket: Bucket = client.get(bucketId) + require(bucket != null, s"Bucket $bucketId does not exist.") + + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + bucket.create(name, content).getBlobId.getName + } + + override def uploadFile(name: String, content: Path): Future[String] = Future { + bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName + } + + override def exists(name: String): Future[Boolean] = Future { + bucket.get(name) != null + } + + override def list(prefix: String): Future[Set[String]] = Future { + val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) + page + .iterateAll() + .asScala + .map(_.getName()) + .toSet + } + + override def content(name: String): Future[Option[Array[Byte]]] = Future { + Option(bucket.get(name)).map(blob => blob.getContent()) + } + + override def download(name: String) = Future { + Option(bucket.get(name)).map { blob => + ChannelStream.fromChannel(() => blob.reader(), chunkSize) + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + val blob = bucket.create(name, Array.emptyByteArray) + ChannelStream.toChannel(() => blob.writer(), chunkSize) + } + + override def delete(name: String): Future[String] = Future { + client.delete(BlobId.of(bucketId, name)) + name + } + + override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { + Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) + } + + override def url(name: String): Future[Option[URL]] = Future { + val protocol: String = "https" + val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/" + Option(bucket.get(name)).map { blob => + new URL(protocol, resourcePath, blob.getName) + } + } +} + +object GcsBlobStorage { + final val DefaultChunkSize = 8192 + + private def newClient(key: InputStream): Storage = + StorageOptions + .newBuilder() + .setCredentials(ServiceAccountCredentials.fromStream(key)) + .build() + .getService() + + def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( + implicit ec: ExecutionContext): GcsBlobStorage = { + val client = newClient(new FileInputStream(keyfile.toFile)) + new GcsBlobStorage(client, bucketId, chunkSize) + } + +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala new file mode 100644 index 0000000..fc652be --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala @@ -0,0 +1,112 @@ +package xyz.driver.core.storage + +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, WritableByteChannel} + +import akka.stream._ +import akka.stream.scaladsl.{Sink, Source} +import akka.stream.stage._ +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.concurrent.{Future, Promise} +import scala.util.control.NonFatal + +class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) + extends GraphStage[SourceShape[ByteString]] { + + val out = Outlet[ByteString]("ChannelSource.out") + val shape = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + val channel = createChannel() + + object Handler extends OutHandler { + override def onPull(): Unit = { + try { + val buffer = ByteBuffer.allocate(chunkSize) + if (channel.read(buffer) > 0) { + buffer.flip() + push(out, ByteString.fromByteBuffer(buffer)) + } else { + completeStage() + } + } catch { + case NonFatal(_) => + channel.close() + } + } + override def onDownstreamFinish(): Unit = { + channel.close() + } + } + + setHandler(out, Handler) + } + +} + +class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) + extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { + + val in = Inlet[ByteString]("ChannelSink.in") + val shape = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogic(shape) { + val channel = createChannel() + + object Handler extends InHandler { + override def onPush(): Unit = { + try { + val data = grab(in) + channel.write(data.asByteBuffer) + pull(in) + } catch { + case NonFatal(e) => + channel.close() + promise.failure(e) + } + } + + override def onUpstreamFinish(): Unit = { + channel.close() + completeStage() + promise.success(Done) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + channel.close() + promise.failure(ex) + } + } + + setHandler(in, Handler) + + override def preStart(): Unit = { + pull(in) + } + } + (logic, promise.future) + } + +} + +object ChannelStream { + + def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { + Source + .fromGraph(new ChannelSource(channel, chunkSize)) + .withAttributes(Attributes(ActorAttributes.IODispatcher)) + .async + } + + def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { + Sink + .fromGraph(new ChannelSink(channel, chunkSize)) + .withAttributes(Attributes(ActorAttributes.IODispatcher)) + .async + } + +} diff --git a/core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala b/core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala new file mode 100644 index 0000000..811cc60 --- /dev/null +++ b/core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala @@ -0,0 +1,94 @@ +package xyz.driver.core + +import java.nio.file.Files + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.scalatest._ +import org.scalatest.concurrent.ScalaFutures +import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage} + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +class BlobStorageTest extends FlatSpec with ScalaFutures { + + implicit val patientce = PatienceConfig(timeout = 100.seconds) + + implicit val system = ActorSystem("blobstorage-test") + implicit val mat = ActorMaterializer() + import system.dispatcher + + def storageBehaviour(storage: BlobStorage) = { + val key = "foo" + val data = "hello world".getBytes + it should "upload data" in { + assert(storage.exists(key).futureValue === false) + assert(storage.uploadContent(key, data).futureValue === key) + assert(storage.exists(key).futureValue === true) + } + it should "download data" in { + val content = storage.content(key).futureValue + assert(content.isDefined) + assert(content.get === data) + } + it should "not download non-existing data" in { + assert(storage.content("bar").futureValue.isEmpty) + } + it should "overwrite an existing key" in { + val newData = "new string".getBytes("utf-8") + assert(storage.uploadContent(key, newData).futureValue === key) + assert(storage.content(key).futureValue.get === newData) + } + it should "upload a file" in { + val tmp = Files.createTempFile("testfile", "txt") + Files.write(tmp, data) + assert(storage.uploadFile(key, tmp).futureValue === key) + Files.delete(tmp) + } + it should "upload content" in { + val text = "foobar" + val src = Source + .single(text) + .map(l => ByteString(l)) + src.runWith(storage.upload(key).futureValue).futureValue + assert(storage.content(key).futureValue.map(_.toSeq) === Some("foobar".getBytes.toSeq)) + } + it should "delete content" in { + assert(storage.exists(key).futureValue) + storage.delete(key).futureValue + assert(!storage.exists(key).futureValue) + } + it should "download content" in { + storage.uploadContent(key, data) futureValue + val srcOpt = storage.download(key).futureValue + assert(srcOpt.isDefined) + val src = srcOpt.get + val content: Future[Array[Byte]] = src.runWith(Sink.fold(Array[Byte]())(_ ++ _)) + assert(content.futureValue === data) + } + it should "list keys" in { + assert(storage.list("").futureValue === Set(key)) + storage.uploadContent("a/a.txt", data).futureValue + storage.uploadContent("a/b", data).futureValue + storage.uploadContent("c/d", data).futureValue + storage.uploadContent("d", data).futureValue + assert(storage.list("").futureValue === Set(key, "a", "c", "d")) + assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) + assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) + assert(storage.list("c").futureValue === Set("c/d")) + } + it should "get valid URL" in { + assert(storage.exists(key).futureValue === true) + val fooUrl = storage.url(key).futureValue + assert(fooUrl.isDefined) + } + } + + "File system storage" should behave like storageBehaviour( + new FileSystemBlobStorage(Files.createTempDirectory("test"))) + +} diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala deleted file mode 100644 index 8b7bca7..0000000 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ /dev/null @@ -1,157 +0,0 @@ -package xyz.driver.core.messaging -import java.nio.ByteBuffer -import java.util - -import com.aliyun.mns.client.{AsyncCallback, CloudAccount} -import com.aliyun.mns.common.ServiceException -import com.aliyun.mns.model -import com.aliyun.mns.model._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} - -class AliyunBus( - accountId: String, - accessId: String, - accessSecret: String, - region: String, - namespace: String, - pullTimeout: Int -)(implicit val executionContext: ExecutionContext) - extends Bus { - private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" - private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) - private val client = cloudAccount.getMNSClient - - // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the - // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError - // occasionally. We mask both of these errors and return an empty list of messages - private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") - - override val defaultMaxMessages: Int = 10 - - case class MessageId(queueName: String, messageHandle: String) - - case class Message[A](id: MessageId, data: A) extends BasicMessage[A] - - case class SubscriptionConfig( - subscriptionPrefix: String = accessId, - ackTimeout: FiniteDuration = 10.seconds - ) - - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - private def rawTopicName(topic: Topic[_]) = - s"$namespace-${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"$namespace-${config.subscriptionPrefix}-${topic.name}" - - override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - import collection.JavaConverters._ - val subscriptionName = rawSubscriptionName(config, topic) - val queueRef = client.getQueueRef(subscriptionName) - - val promise = Promise[Seq[model.Message]] - queueRef.asyncBatchPopMessage( - maxMessages, - pullTimeout, - new AsyncCallback[util.List[model.Message]] { - override def onSuccess(result: util.List[model.Message]): Unit = { - promise.success(result.asScala) - } - override def onFail(ex: Exception): Unit = ex match { - case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => - promise.success(Nil) - case _ => - promise.failure(ex) - } - } - ) - - promise.future.map(_.map { message => - import scala.xml.XML - val messageId = MessageId(subscriptionName, message.getReceiptHandle) - val messageXML = XML.loadString(message.getMessageBodyAsRawString) - val messageNode = messageXML \ "Message" - val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) - - val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) - Message(messageId, deserializedMessage) - }) - } - - override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { - import collection.JavaConverters._ - require(messages.nonEmpty, "Acknowledged message list must be non-empty") - - val queueRef = client.getQueueRef(messages.head.queueName) - - val promise = Promise[Unit] - queueRef.asyncBatchDeleteMessage( - messages.map(_.messageHandle).asJava, - new AsyncCallback[Void] { - override def onSuccess(result: Void): Unit = promise.success(()) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { - val topicRef = client.getTopicRef(rawTopicName(topic)) - - val publishMessages = messages.map { message => - val promise = Promise[TopicMessage] - - val topicMessage = new Base64TopicMessage - topicMessage.setMessageBody(topic.serialize(message).array()) - - topicRef.asyncPublishMessage( - topicMessage, - new AsyncCallback[TopicMessage] { - override def onSuccess(result: TopicMessage): Unit = promise.success(result) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - Future.sequence(publishMessages).map(_ => ()) - } - - def createTopic(topic: Topic[_]): Future[Unit] = Future { - val topicName = rawTopicName(topic) - val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) - if (!topicExists) { - val topicMeta = new TopicMeta - topicMeta.setTopicName(topicName) - client.createTopic(topicMeta) - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { - val subscriptionName = rawSubscriptionName(config, topic) - val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) - - if (!queueExists) { - val topicName = rawTopicName(topic) - val topicRef = client.getTopicRef(topicName) - - val queueMeta = new QueueMeta - queueMeta.setQueueName(subscriptionName) - queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) - client.createQueue(queueMeta) - - val subscriptionMeta = new SubscriptionMeta - subscriptionMeta.setSubscriptionName(subscriptionName) - subscriptionMeta.setTopicName(topicName) - subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) - topicRef.subscribe(subscriptionMeta) - } - } -} diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala deleted file mode 100644 index 75954f4..0000000 --- a/src/main/scala/xyz/driver/core/messaging/Bus.scala +++ /dev/null @@ -1,74 +0,0 @@ -package xyz.driver.core -package messaging - -import scala.concurrent._ -import scala.language.higherKinds - -/** Base trait for representing message buses. - * - * Message buses are expected to provide "at least once" delivery semantics and are - * expected to retry delivery when a message remains unacknowledged for a reasonable - * amount of time. */ -trait Bus { - - /** Type of unique message identifiers. Usually a string or UUID. */ - type MessageId - - /** Most general kind of message. Any implementation of a message bus must provide - * the fields and methods specified in this trait. */ - trait BasicMessage[A] { self: Message[A] => - - /** All messages must have unique IDs so that they can be acknowledged unambiguously. */ - def id: MessageId - - /** Actual message content. */ - def data: A - - } - - /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage - * (as that defines the minimal required fields of a messages), but may be refined to - * provide bus-specific additional data. */ - type Message[A] <: BasicMessage[A] - - /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */ - type SubscriptionConfig - - /** Default value for a subscription configuration. It is such that any service will have a unique subscription - * for every topic, shared among all its instances. */ - val defaultSubscriptionConfig: SubscriptionConfig - - /** Maximum amount of messages handled in a single retrieval call. */ - val defaultMaxMessages = 64 - - /** Execution context that is used to query and dispatch messages from this bus. */ - implicit val executionContext: ExecutionContext - - /** Retrieve any new messages in the mailbox of a subscription. - * - * Any retrieved messages become "outstanding" and should not be returned by this function - * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged. - * In that case, they will again be considered new and will be returned by this function. - * - * Note that although outstanding and acknowledged messages will eventually be removed from - * mailboxes, no guarantee can be made that a message will be delivered only once. */ - def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]] - - /** Acknowledge that a given series of messages has been handled and should - * not be delivered again. - * - * Note that messages become eventually acknowledged and hence may be delivered more than once. - * @see fetchMessages() - */ - def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] - - /** Send a series of messages to a topic. - * - * The returned future will complete once messages have been accepted to the underlying bus. - * No guarantee can be made of their delivery to subscribers. */ - def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] - -} diff --git a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala deleted file mode 100644 index 1af5308..0000000 --- a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala +++ /dev/null @@ -1,49 +0,0 @@ -package xyz.driver.core -package messaging - -import java.util.concurrent.ConcurrentHashMap - -import scala.async.Async.{async, await} -import scala.concurrent.Future - -/** Utility mixin that will ensure that topics and subscriptions exist before - * attempting to read or write from or to them. - */ -trait CreateOnDemand extends Bus { - - /** Create the given topic. This operation is idempotent and is expected to succeed if the topic - * already exists. - */ - def createTopic(topic: Topic[_]): Future[Unit] - - /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription - * already exists. - */ - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] - - private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]] - private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]] - - private def ensureTopic(topic: Topic[_]) = - createdTopics.computeIfAbsent(topic, t => createTopic(t)) - - private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) = - createdSubscriptions.computeIfAbsent(topic -> config, { - case (t, c) => createSubscription(t, c) - }) - - abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { - await(ensureTopic(topic)) - await(super.publishMessages(topic, messages)) - } - - abstract override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = async { - await(ensureTopic(topic)) - await(ensureSubscription(topic, config)) - await(super.fetchMessages(topic, config, maxMessages)) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala deleted file mode 100644 index b296c50..0000000 --- a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala +++ /dev/null @@ -1,267 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer -import java.nio.file.{Files, Path, Paths} -import java.security.Signature -import java.time.Instant -import java.util - -import com.google.auth.oauth2.ServiceAccountCredentials -import com.softwaremill.sttp._ -import spray.json.DefaultJsonProtocol._ -import spray.json._ - -import scala.async.Async.{async, await} -import scala.concurrent._ -import scala.concurrent.duration._ - -/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]] - * - * == Overview == - * - * The Pub/Sub message system is focused around a few concepts: 'topics', - * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may - * have multiple ''subscriptions'' associated to them. Every subscription to a - * topic will receive all messages sent to the topic. Messages are enqueued in - * a subscription until they are acknowledged by a ''subscriber''. Multiple - * subscribers may be associated to a subscription, in which case messages will - * get delivered arbitrarily among them. - * - * Topics and subscriptions are named resources which can be specified in - * Pub/Sub's configuration and may be queried. Subscribers on the other hand, - * are ephemeral processes that query a subscription on a regular basis, handle any - * messages and acknowledge them. - * - * == Delivery semantics == - * - * - at least once - * - no ordering - * - * == Retention == - * - * - configurable retry delay for unacknowledged messages, defaults to 10s - * - undeliverable messages are kept for 7 days - * - * @param credentials Google cloud credentials, usually the same as used by a - * service. Must have admin access to topics and - * descriptions. - * @param namespace The namespace in which this bus is running. Will be used to - * determine the exact name of topics and subscriptions. - * @param pullTimeout Delay after which a call to fetchMessages() will return an - * empty list, assuming that no messages have been received. - * @param executionContext Execution context to run any blocking commands. - * @param backend sttp backend used to query Pub/Sub's HTTP API - */ -class GoogleBus( - credentials: ServiceAccountCredentials, - namespace: String, - pullTimeout: Duration = 90.seconds -)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _]) - extends Bus { - import GoogleBus.Protocol - - case class MessageId(subscription: String, ackId: String) - - case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A] - type Message[A] = PubsubMessage[A] - - /** Subscription-specific configuration - * - * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions. - * All messages sent to a subscription will be dispatched arbitrarily - * among any subscribers. Defaults to the email of the credentials used by this - * bus instance, thereby giving every service a unique subscription to every topic. - * To give every service instance a unique subscription, this must be changed to a - * unique value. - * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again. - */ - case class SubscriptionConfig( - subscriptionPrefix: String = credentials.getClientEmail.split("@")(0), - ackTimeout: FiniteDuration = 10.seconds - ) - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - /** Obtain an authentication token valid for the given duration - * https://developers.google.com/identity/protocols/OAuth2ServiceAccount - */ - private def freshAuthToken(duration: FiniteDuration): Future[String] = { - def jwt = { - val now = Instant.now().getEpochSecond - val base64 = util.Base64.getEncoder - val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) - val body = base64.encodeToString( - s"""|{ - | "iss": "${credentials.getClientEmail}", - | "scope": "https://www.googleapis.com/auth/pubsub", - | "aud": "https://www.googleapis.com/oauth2/v4/token", - | "exp": ${now + duration.toSeconds}, - | "iat": $now - |}""".stripMargin.getBytes("utf-8") - ) - val signer = Signature.getInstance("SHA256withRSA") - signer.initSign(credentials.getPrivateKey) - signer.update(s"$header.$body".getBytes("utf-8")) - val signature = base64.encodeToString(signer.sign()) - s"$header.$body.$signature" - } - sttp - .post(uri"https://www.googleapis.com/oauth2/v4/token") - .body( - "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", - "assertion" -> jwt - ) - .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) - .send() - .map(_.unsafeBody) - } - - // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time - private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes)) - - private val baseUri = uri"https://pubsub.googleapis.com/" - - private def rawTopicName(topic: Topic[_]) = - s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}" - - def createTopic(topic: Topic[_]): Future[Unit] = async { - val request = sttp - .put(baseUri.path(s"v1/${rawTopicName(topic)}")) - .auth - .bearer(await(getToken())) - val result = await(request.send()) - result.body match { - case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it - throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error") - case _ => () - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async { - val request = sttp - .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}")) - .auth - .bearer(await(getToken())) - .body( - JsObject( - "topic" -> rawTopicName(topic).toJson, - "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson - ).compactPrint - ) - val result = await(request.send()) - result.body match { - case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it - throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error") - case _ => () - } - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async { - import Protocol.bufferFormat - val buffers: Seq[ByteBuffer] = messages.map(topic.serialize) - val request = sttp - .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish")) - .auth - .bearer(await(getToken())) - .body( - JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint - ) - await(request.send()).unsafeBody - () - } - - override def fetchMessages[A]( - topic: Topic[A], - subscriptionConfig: SubscriptionConfig, - maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async { - val subscription = rawSubscriptionName(subscriptionConfig, topic) - val request = sttp - .post(baseUri.path(s"v1/$subscription:pull")) - .auth - .bearer(await(getToken().map(x => x))) - .body( - JsObject( - "returnImmediately" -> JsFalse, - "maxMessages" -> JsNumber(maxMessages) - ).compactPrint - ) - .readTimeout(pullTimeout) - .mapResponse(_.parseJson) - - val messages = await(request.send()).unsafeBody match { - case JsObject(fields) if fields.isEmpty => Seq() - case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages - } - - messages.map { msg => - PubsubMessage[A]( - MessageId(subscription, msg.ackId), - topic.deserialize(msg.message.data), - msg.message.publishTime - ) - } - } - - override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async { - val request = sttp - .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge")) - .auth - .bearer(await(getToken())) - .body( - JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint - ) - await(request.send()).unsafeBody - () - } - -} - -object GoogleBus { - - private object Protocol extends DefaultJsonProtocol { - case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage]) - case class ReceivedMessage(ackId: String, message: PubsubMessage) - case class PubsubMessage(data: ByteBuffer, publishTime: Instant) - - implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] { - override def write(obj: Instant): JsValue = JsString(obj.toString) - override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) - } - implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] { - override def write(obj: ByteBuffer): JsValue = - JsString(util.Base64.getEncoder.encodeToString(obj.array())) - - override def read(json: JsValue): ByteBuffer = { - val encodedBytes = json.convertTo[String].getBytes("utf-8") - val decodedBytes = util.Base64.getDecoder.decode(encodedBytes) - ByteBuffer.wrap(decodedBytes) - } - } - - implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage) - implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage) - implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull) - } - - def fromKeyfile(keyfile: Path, namespace: String)( - implicit executionContext: ExecutionContext, - backend: SttpBackend[Future, _]): GoogleBus = { - val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile)) - new GoogleBus(creds, namespace) - } - - @deprecated( - "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.", - "driver-core 1.12.2") - def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = { - def env(key: String) = { - require(sys.env.contains(key), s"Environment variable $key is not set.") - sys.env(key) - } - val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS")) - fromKeyfile(keyfile, env("SERVICE_NAMESPACE")) - } - -} diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala deleted file mode 100644 index 45c9ed5..0000000 --- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala +++ /dev/null @@ -1,126 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -import akka.actor.{Actor, ActorSystem, Props} - -import scala.collection.mutable -import scala.collection.mutable.Map -import scala.concurrent.duration._ -import scala.concurrent.{Future, Promise} - -/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system - * and is intended for testing purposes only. */ -class QueueBus(implicit system: ActorSystem) extends Bus { - import system.dispatcher - - override type SubscriptionConfig = Long - override val defaultSubscriptionConfig: Long = 0 - override val executionContext = system.dispatcher - - override type MessageId = (String, SubscriptionConfig, Long) - type Message[A] = BasicMessage[A] - - private object ActorMessages { - case class Send[A](topic: Topic[A], messages: Seq[A]) - case class Ack(messages: Seq[MessageId]) - case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]]) - case object Unack - } - - class Subscription { - val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty - } - - private class BusMaster extends Actor { - var _id = 0L - def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = { - _id += 1; (topic, cfg, _id) - } - - val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty - - def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = { - topics.get(topic) match { - case Some(t) => - t.getOrElseUpdate(cfg, new Subscription) - case None => - topics += topic -> mutable.Map.empty - ensureSubscription(topic, cfg) - } - } - - override def preStart(): Unit = { - context.system.scheduler.schedule(1.seconds, 1.seconds) { - self ! ActorMessages.Unack - } - } - - override def receive: Receive = { - case ActorMessages.Send(topic, messages) => - val buffers = messages.map(topic.serialize) - val subscriptions = topics.getOrElse(topic.name, Map.empty) - for ((cfg, subscription) <- subscriptions) { - for (buffer <- buffers) { - subscription.mailbox += nextId(topic.name, cfg) -> buffer - } - } - - case ActorMessages.Fetch(topic, cfg, max, promise) => - ensureSubscription(topic.name, cfg) - val subscription = topics(topic.name)(cfg) - val messages = subscription.mailbox.take(max) - subscription.unacked ++= messages - subscription.mailbox --= messages.map(_._1) - promise.success(messages.toSeq.map { - case (ackId, buffer) => - new Message[Any] { - val id = ackId - val data = topic.deserialize(buffer) - } - }) - - case ActorMessages.Ack(messageIds) => - for (id @ (topic, cfg, _) <- messageIds) { - ensureSubscription(topic, cfg) - val subscription = topics(topic)(cfg) - subscription.unacked -= id - } - - case ActorMessages.Unack => - for ((_, subscriptions) <- topics) { - for ((_, subscription) <- subscriptions) { - subscription.mailbox ++= subscription.unacked - subscription.unacked.clear() - } - } - } - - } - - val actor = system.actorOf(Props(new BusMaster)) - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future { - actor ! ActorMessages.Send(topic, messages) - } - - override def fetchMessages[A]( - topic: messaging.Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - val result = Promise[Seq[Message[A]]] - actor ! ActorMessages.Fetch(topic, config, maxMessages, result) - result.future - } - - override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future { - actor ! ActorMessages.Ack(ids) - } - -} - -object QueueBus { - def apply(implicit system: ActorSystem) = new QueueBus -} diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala deleted file mode 100644 index 44d75cd..0000000 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ /dev/null @@ -1,102 +0,0 @@ -package xyz.driver.core -package messaging - -import akka.NotUsed -import akka.stream.Materializer -import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source} - -import scala.collection.mutable.ListBuffer -import scala.concurrent.Future -import scala.concurrent.duration._ - -/** An extension to message buses that offers an Akka-Streams API. - * - * Example usage of a stream that subscribes to one topic, prints any messages - * it receives and finally acknowledges them - * their receipt. - * {{{ - * val bus: StreamBus = ??? - * val topic = Topic.string("topic") - * bus.subscribe(topic1) - * .map{ msg => - * print(msg.data) - * msg - * } - * .to(bus.acknowledge) - * .run() - * }}} */ -trait StreamBus extends Bus { - - /** Flow that publishes any messages to a given topic. - * Emits messages once they have been published to the underlying bus. */ - def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { - Flow[A] - .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) - .mapAsync(1) { a => - publishMessages(topic, a).map(_ => a) - } - .mapConcat(list => list.toList) - } - - /** Sink that acknowledges the receipt of a message. */ - def acknowledge: Sink[MessageId, NotUsed] = { - Flow[MessageId] - .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) - .mapAsync(1)(acknowledgeMessages(_)) - .to(Sink.ignore) - } - - /** Source that listens to a subscription and receives any messages sent to its topic. */ - def subscribe[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { - Source - .unfoldAsync((topic, config))( - topicAndConfig => - fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs => - Some(topicAndConfig -> msgs)) - ) - .filter(_.nonEmpty) - .mapConcat(messages => messages.toList) - } - - def runWithRestart[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - minBackoff: FiniteDuration = 3.seconds, - maxBackoff: FiniteDuration = 30.seconds, - randomFactor: Double = 0.2, - maxRestarts: Int = 20 - )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = { - RestartSource - .withBackoff[MessageId]( - minBackoff, - maxBackoff, - randomFactor, - maxRestarts - ) { () => - subscribe(topic, config) - .via(processMessage) - .log(topic.name) - .mapConcat(identity) - } - .to(acknowledge) - .run() - } - - def handleMessage[A]( - topic: Topic[A], - config: SubscriptionConfig = defaultSubscriptionConfig, - parallelism: Int = 1, - minBackoff: FiniteDuration = 3.seconds, - maxBackoff: FiniteDuration = 30.seconds, - randomFactor: Double = 0.2, - maxRestarts: Int = 20 - )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = { - runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) { - Flow[Message[A]].mapAsync(parallelism) { message => - processMessage(message.data).map(_ => message.id :: Nil) - } - } - } -} diff --git a/src/main/scala/xyz/driver/core/messaging/Topic.scala b/src/main/scala/xyz/driver/core/messaging/Topic.scala deleted file mode 100644 index 32fd764..0000000 --- a/src/main/scala/xyz/driver/core/messaging/Topic.scala +++ /dev/null @@ -1,43 +0,0 @@ -package xyz.driver.core -package messaging - -import java.nio.ByteBuffer - -/** A topic is a named group of messages that all share a common schema. - * @tparam Message type of messages sent over this topic */ -trait Topic[Message] { - - /** Name of this topic (must be unique). */ - def name: String - - /** Convert a message to its wire format that will be sent over a bus. */ - def serialize(message: Message): ByteBuffer - - /** Convert a message from its wire format. */ - def deserialize(message: ByteBuffer): Message - -} - -object Topic { - - /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */ - def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] { - def name = name0 - override def serialize(message: ByteBuffer): ByteBuffer = message - override def deserialize(message: ByteBuffer): ByteBuffer = message - } - - /** Create a topic that represents data as UTF-8 encoded strings. */ - def string(name0: String): Topic[String] = new Topic[String] { - def name = name0 - override def serialize(message: String): ByteBuffer = { - ByteBuffer.wrap(message.getBytes("utf-8")) - } - override def deserialize(message: ByteBuffer): String = { - val bytes = new Array[Byte](message.remaining()) - message.get(bytes) - new String(bytes, "utf-8") - } - } - -} diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala deleted file mode 100644 index b5e8678..0000000 --- a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala +++ /dev/null @@ -1,108 +0,0 @@ -package xyz.driver.core.storage - -import java.io.ByteArrayInputStream -import java.net.URL -import java.nio.file.Path -import java.util.Date - -import akka.Done -import akka.stream.scaladsl.{Sink, Source, StreamConverters} -import akka.util.ByteString -import com.aliyun.oss.OSSClient -import com.aliyun.oss.model.ObjectPermission -import com.typesafe.config.Config -import xyz.driver.core.time.provider.TimeProvider - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class AliyunBlobStorage( - client: OSSClient, - bucketId: String, - timeProvider: TimeProvider, - chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) - extends SignedBlobStorage { - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - client.putObject(bucketId, name, new ByteArrayInputStream(content)) - name - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - client.putObject(bucketId, name, content.toFile) - name - } - - override def exists(name: String): Future[Boolean] = Future { - client.doesObjectExist(bucketId, name) - } - - override def list(prefix: String): Future[Set[String]] = Future { - client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - val inputStream = obj.getObjectContent - Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray - } - } - - override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - StreamConverters - .asInputStream() - .mapMaterializedValue(is => - Future { - client.putObject(bucketId, name, is) - Done - }) - } - - override def delete(name: String): Future[String] = Future { - client.deleteObject(bucketId, name) - name - } - - override def url(name: String): Future[Option[URL]] = Future { - // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm - Option(client.getObjectAcl(bucketId, name)).map { acl => - val isPrivate = acl.getPermission == ObjectPermission.Private - val bucket = client.getBucketInfo(bucketId).getBucket - val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint - new URL(s"https://$bucketId.$endpointUrl/$name") - } - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - if (client.doesObjectExist(bucketId, name)) { - val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis) - Some(client.generatePresignedUrl(bucketId, name, expiration)) - } else { - None - } - } -} - -object AliyunBlobStorage { - val DefaultChunkSize: Int = 8192 - - def apply(config: Config, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val clientId = config.getString("storage.aliyun.clientId") - val clientSecret = config.getString("storage.aliyun.clientSecret") - val endpoint = config.getString("storage.aliyun.endpoint") - this(clientId, clientSecret, endpoint, bucketId, timeProvider) - } - - def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val client = new OSSClient(endpoint, clientId, clientSecret) - new AliyunBlobStorage(client, bucketId, timeProvider) - } -} diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala deleted file mode 100644 index 0cde96a..0000000 --- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala +++ /dev/null @@ -1,50 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.{Sink, Source} -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration.Duration - -/** Binary key-value store, typically implemented by cloud storage. */ -trait BlobStorage { - - /** Upload data by value. */ - def uploadContent(name: String, content: Array[Byte]): Future[String] - - /** Upload data from an existing file. */ - def uploadFile(name: String, content: Path): Future[String] - - def exists(name: String): Future[Boolean] - - /** List available keys. The prefix determines which keys should be listed - * and depends on the implementation (for instance, a file system backed - * blob store will treat a prefix as a directory path). */ - def list(prefix: String): Future[Set[String]] - - /** Get all the content of a given object. */ - def content(name: String): Future[Option[Array[Byte]]] - - /** Stream data asynchronously and with backpressure. */ - def download(name: String): Future[Option[Source[ByteString, Any]]] - - /** Get a sink to upload data. */ - def upload(name: String): Future[Sink[ByteString, Future[Done]]] - - /** Delete a stored value. */ - def delete(name: String): Future[String] - - /** - * Path to specified resource. Checks that the resource exists and returns None if - * it is not found. Depending on the implementation, may throw. - */ - def url(name: String): Future[Option[URL]] -} - -trait SignedBlobStorage extends BlobStorage { - def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] -} diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala deleted file mode 100644 index e12c73d..0000000 --- a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala +++ /dev/null @@ -1,82 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.{Files, Path, StandardCopyOption} - -import akka.stream.scaladsl.{FileIO, Sink, Source} -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} - -/** A blob store that is backed by a local filesystem. All objects are stored relative to the given - * root path. Slashes ('/') in blob names are treated as usual path separators and are converted - * to directories. */ -class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { - - private def ensureParents(file: Path): Path = { - Files.createDirectories(file.getParent()) - file - } - - private def file(name: String) = root.resolve(name) - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - Files.write(ensureParents(file(name)), content) - name - } - override def uploadFile(name: String, content: Path): Future[String] = Future { - Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) - name - } - - override def exists(name: String): Future[Boolean] = Future { - val path = file(name) - Files.exists(path) && Files.isReadable(path) - } - - override def list(prefix: String): Future[Set[String]] = Future { - val dir = file(prefix) - Files - .list(dir) - .iterator() - .asScala - .map(p => root.relativize(p)) - .map(_.toString) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { - case true => - Some(Files.readAllBytes(file(name))) - case false => None - } - - override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { - if (Files.exists(file(name))) { - Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) - } else { - None - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val f = ensureParents(file(name)) - FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) - } - - override def delete(name: String): Future[String] = exists(name).map { e => - if (e) { - Files.delete(file(name)) - } - name - } - - override def url(name: String): Future[Option[URL]] = exists(name) map { - case true => - Some(root.resolve(name).toUri.toURL) - case false => - None - } -} diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala deleted file mode 100644 index 95164c7..0000000 --- a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala +++ /dev/null @@ -1,96 +0,0 @@ -package xyz.driver.core.storage - -import java.io.{FileInputStream, InputStream} -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.Sink -import akka.util.ByteString -import com.google.api.gax.paging.Page -import com.google.auth.oauth2.ServiceAccountCredentials -import com.google.cloud.storage.Storage.BlobListOption -import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( - implicit ec: ExecutionContext) - extends BlobStorage with SignedBlobStorage { - - private val bucket: Bucket = client.get(bucketId) - require(bucket != null, s"Bucket $bucketId does not exist.") - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - bucket.create(name, content).getBlobId.getName - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName - } - - override def exists(name: String): Future[Boolean] = Future { - bucket.get(name) != null - } - - override def list(prefix: String): Future[Set[String]] = Future { - val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) - page - .iterateAll() - .asScala - .map(_.getName()) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(bucket.get(name)).map(blob => blob.getContent()) - } - - override def download(name: String) = Future { - Option(bucket.get(name)).map { blob => - ChannelStream.fromChannel(() => blob.reader(), chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val blob = bucket.create(name, Array.emptyByteArray) - ChannelStream.toChannel(() => blob.writer(), chunkSize) - } - - override def delete(name: String): Future[String] = Future { - client.delete(BlobId.of(bucketId, name)) - name - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) - } - - override def url(name: String): Future[Option[URL]] = Future { - val protocol: String = "https" - val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/" - Option(bucket.get(name)).map { blob => - new URL(protocol, resourcePath, blob.getName) - } - } -} - -object GcsBlobStorage { - final val DefaultChunkSize = 8192 - - private def newClient(key: InputStream): Storage = - StorageOptions - .newBuilder() - .setCredentials(ServiceAccountCredentials.fromStream(key)) - .build() - .getService() - - def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( - implicit ec: ExecutionContext): GcsBlobStorage = { - val client = newClient(new FileInputStream(keyfile.toFile)) - new GcsBlobStorage(client, bucketId, chunkSize) - } - -} diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala deleted file mode 100644 index fc652be..0000000 --- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala +++ /dev/null @@ -1,112 +0,0 @@ -package xyz.driver.core.storage - -import java.nio.ByteBuffer -import java.nio.channels.{ReadableByteChannel, WritableByteChannel} - -import akka.stream._ -import akka.stream.scaladsl.{Sink, Source} -import akka.stream.stage._ -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.concurrent.{Future, Promise} -import scala.util.control.NonFatal - -class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) - extends GraphStage[SourceShape[ByteString]] { - - val out = Outlet[ByteString]("ChannelSource.out") - val shape = SourceShape(out) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends OutHandler { - override def onPull(): Unit = { - try { - val buffer = ByteBuffer.allocate(chunkSize) - if (channel.read(buffer) > 0) { - buffer.flip() - push(out, ByteString.fromByteBuffer(buffer)) - } else { - completeStage() - } - } catch { - case NonFatal(_) => - channel.close() - } - } - override def onDownstreamFinish(): Unit = { - channel.close() - } - } - - setHandler(out, Handler) - } - -} - -class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) - extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { - - val in = Inlet[ByteString]("ChannelSink.in") - val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { - val promise = Promise[Done]() - val logic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends InHandler { - override def onPush(): Unit = { - try { - val data = grab(in) - channel.write(data.asByteBuffer) - pull(in) - } catch { - case NonFatal(e) => - channel.close() - promise.failure(e) - } - } - - override def onUpstreamFinish(): Unit = { - channel.close() - completeStage() - promise.success(Done) - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - channel.close() - promise.failure(ex) - } - } - - setHandler(in, Handler) - - override def preStart(): Unit = { - pull(in) - } - } - (logic, promise.future) - } - -} - -object ChannelStream { - - def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { - Source - .fromGraph(new ChannelSource(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - - def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { - Sink - .fromGraph(new ChannelSink(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - -} diff --git a/src/test/scala/xyz/driver/core/BlobStorageTest.scala b/src/test/scala/xyz/driver/core/BlobStorageTest.scala deleted file mode 100644 index 811cc60..0000000 --- a/src/test/scala/xyz/driver/core/BlobStorageTest.scala +++ /dev/null @@ -1,94 +0,0 @@ -package xyz.driver.core - -import java.nio.file.Files - -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl._ -import akka.util.ByteString -import org.scalatest._ -import org.scalatest.concurrent.ScalaFutures -import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage} - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.language.postfixOps - -class BlobStorageTest extends FlatSpec with ScalaFutures { - - implicit val patientce = PatienceConfig(timeout = 100.seconds) - - implicit val system = ActorSystem("blobstorage-test") - implicit val mat = ActorMaterializer() - import system.dispatcher - - def storageBehaviour(storage: BlobStorage) = { - val key = "foo" - val data = "hello world".getBytes - it should "upload data" in { - assert(storage.exists(key).futureValue === false) - assert(storage.uploadContent(key, data).futureValue === key) - assert(storage.exists(key).futureValue === true) - } - it should "download data" in { - val content = storage.content(key).futureValue - assert(content.isDefined) - assert(content.get === data) - } - it should "not download non-existing data" in { - assert(storage.content("bar").futureValue.isEmpty) - } - it should "overwrite an existing key" in { - val newData = "new string".getBytes("utf-8") - assert(storage.uploadContent(key, newData).futureValue === key) - assert(storage.content(key).futureValue.get === newData) - } - it should "upload a file" in { - val tmp = Files.createTempFile("testfile", "txt") - Files.write(tmp, data) - assert(storage.uploadFile(key, tmp).futureValue === key) - Files.delete(tmp) - } - it should "upload content" in { - val text = "foobar" - val src = Source - .single(text) - .map(l => ByteString(l)) - src.runWith(storage.upload(key).futureValue).futureValue - assert(storage.content(key).futureValue.map(_.toSeq) === Some("foobar".getBytes.toSeq)) - } - it should "delete content" in { - assert(storage.exists(key).futureValue) - storage.delete(key).futureValue - assert(!storage.exists(key).futureValue) - } - it should "download content" in { - storage.uploadContent(key, data) futureValue - val srcOpt = storage.download(key).futureValue - assert(srcOpt.isDefined) - val src = srcOpt.get - val content: Future[Array[Byte]] = src.runWith(Sink.fold(Array[Byte]())(_ ++ _)) - assert(content.futureValue === data) - } - it should "list keys" in { - assert(storage.list("").futureValue === Set(key)) - storage.uploadContent("a/a.txt", data).futureValue - storage.uploadContent("a/b", data).futureValue - storage.uploadContent("c/d", data).futureValue - storage.uploadContent("d", data).futureValue - assert(storage.list("").futureValue === Set(key, "a", "c", "d")) - assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) - assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) - assert(storage.list("c").futureValue === Set("c/d")) - } - it should "get valid URL" in { - assert(storage.exists(key).futureValue === true) - val fooUrl = storage.url(key).futureValue - assert(fooUrl.isDefined) - } - } - - "File system storage" should behave like storageBehaviour( - new FileSystemBlobStorage(Files.createTempDirectory("test"))) - -} diff --git a/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala b/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala deleted file mode 100644 index 8dd0776..0000000 --- a/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala +++ /dev/null @@ -1,30 +0,0 @@ -package xyz.driver.core.messaging - -import akka.actor.ActorSystem -import org.scalatest.FlatSpec -import org.scalatest.concurrent.ScalaFutures - -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ - -class QueueBusTest extends FlatSpec with ScalaFutures { - implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds) - - def busBehaviour(bus: Bus)(implicit ec: ExecutionContext): Unit = { - - it should "deliver messages to a subscriber" in { - val topic = Topic.string("test.topic1") - bus.fetchMessages(topic).futureValue - bus.publishMessages(topic, Seq("hello world!")) - Thread.sleep(100) - val messages = bus.fetchMessages(topic) - assert(messages.futureValue.map(_.data).toList == List("hello world!")) - } - } - - implicit val system: ActorSystem = ActorSystem("queue-test") - import system.dispatcher - - "A queue-based bus" should behave like busBehaviour(new QueueBus) - -} -- cgit v1.2.3