aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-12 16:18:26 -0700
committerJakob Odersky <jakob@odersky.com>2018-10-09 16:19:39 -0700
commit7c755c77afbd67ae2ded9d8b004736d4e27e208f (patch)
treee93f4590165a338ed284adeb6f4a6bd43bb16b6a /src
parent76db2360364a35be31414a12cbc419a534a51744 (diff)
downloaddriver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.gz
driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.bz2
driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.zip
Move storage and messaging to separate projects
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/AliyunBus.scala157
-rw-r--r--src/main/scala/xyz/driver/core/messaging/Bus.scala74
-rw-r--r--src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala49
-rw-r--r--src/main/scala/xyz/driver/core/messaging/GoogleBus.scala267
-rw-r--r--src/main/scala/xyz/driver/core/messaging/QueueBus.scala126
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala102
-rw-r--r--src/main/scala/xyz/driver/core/messaging/Topic.scala43
-rw-r--r--src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala108
-rw-r--r--src/main/scala/xyz/driver/core/storage/BlobStorage.scala50
-rw-r--r--src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala82
-rw-r--r--src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala96
-rw-r--r--src/main/scala/xyz/driver/core/storage/channelStreams.scala112
-rw-r--r--src/test/scala/xyz/driver/core/BlobStorageTest.scala94
-rw-r--r--src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala30
14 files changed, 0 insertions, 1390 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
deleted file mode 100644
index 8b7bca7..0000000
--- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-package xyz.driver.core.messaging
-import java.nio.ByteBuffer
-import java.util
-
-import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
-import com.aliyun.mns.common.ServiceException
-import com.aliyun.mns.model
-import com.aliyun.mns.model._
-
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-class AliyunBus(
- accountId: String,
- accessId: String,
- accessSecret: String,
- region: String,
- namespace: String,
- pullTimeout: Int
-)(implicit val executionContext: ExecutionContext)
- extends Bus {
- private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com"
- private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
- private val client = cloudAccount.getMNSClient
-
- // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the
- // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError
- // occasionally. We mask both of these errors and return an empty list of messages
- private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError")
-
- override val defaultMaxMessages: Int = 10
-
- case class MessageId(queueName: String, messageHandle: String)
-
- case class Message[A](id: MessageId, data: A) extends BasicMessage[A]
-
- case class SubscriptionConfig(
- subscriptionPrefix: String = accessId,
- ackTimeout: FiniteDuration = 10.seconds
- )
-
- override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()
-
- private def rawTopicName(topic: Topic[_]) =
- s"$namespace-${topic.name}"
- private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
- s"$namespace-${config.subscriptionPrefix}-${topic.name}"
-
- override def fetchMessages[A](
- topic: Topic[A],
- config: SubscriptionConfig,
- maxMessages: Int): Future[Seq[Message[A]]] = {
- import collection.JavaConverters._
- val subscriptionName = rawSubscriptionName(config, topic)
- val queueRef = client.getQueueRef(subscriptionName)
-
- val promise = Promise[Seq[model.Message]]
- queueRef.asyncBatchPopMessage(
- maxMessages,
- pullTimeout,
- new AsyncCallback[util.List[model.Message]] {
- override def onSuccess(result: util.List[model.Message]): Unit = {
- promise.success(result.asScala)
- }
- override def onFail(ex: Exception): Unit = ex match {
- case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) =>
- promise.success(Nil)
- case _ =>
- promise.failure(ex)
- }
- }
- )
-
- promise.future.map(_.map { message =>
- import scala.xml.XML
- val messageId = MessageId(subscriptionName, message.getReceiptHandle)
- val messageXML = XML.loadString(message.getMessageBodyAsRawString)
- val messageNode = messageXML \ "Message"
- val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)
-
- val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
- Message(messageId, deserializedMessage)
- })
- }
-
- override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
- import collection.JavaConverters._
- require(messages.nonEmpty, "Acknowledged message list must be non-empty")
-
- val queueRef = client.getQueueRef(messages.head.queueName)
-
- val promise = Promise[Unit]
- queueRef.asyncBatchDeleteMessage(
- messages.map(_.messageHandle).asJava,
- new AsyncCallback[Void] {
- override def onSuccess(result: Void): Unit = promise.success(())
- override def onFail(ex: Exception): Unit = promise.failure(ex)
- }
- )
-
- promise.future
- }
-
- override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
- val topicRef = client.getTopicRef(rawTopicName(topic))
-
- val publishMessages = messages.map { message =>
- val promise = Promise[TopicMessage]
-
- val topicMessage = new Base64TopicMessage
- topicMessage.setMessageBody(topic.serialize(message).array())
-
- topicRef.asyncPublishMessage(
- topicMessage,
- new AsyncCallback[TopicMessage] {
- override def onSuccess(result: TopicMessage): Unit = promise.success(result)
- override def onFail(ex: Exception): Unit = promise.failure(ex)
- }
- )
-
- promise.future
- }
-
- Future.sequence(publishMessages).map(_ => ())
- }
-
- def createTopic(topic: Topic[_]): Future[Unit] = Future {
- val topicName = rawTopicName(topic)
- val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
- if (!topicExists) {
- val topicMeta = new TopicMeta
- topicMeta.setTopicName(topicName)
- client.createTopic(topicMeta)
- }
- }
-
- def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
- val subscriptionName = rawSubscriptionName(config, topic)
- val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)
-
- if (!queueExists) {
- val topicName = rawTopicName(topic)
- val topicRef = client.getTopicRef(topicName)
-
- val queueMeta = new QueueMeta
- queueMeta.setQueueName(subscriptionName)
- queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
- client.createQueue(queueMeta)
-
- val subscriptionMeta = new SubscriptionMeta
- subscriptionMeta.setSubscriptionName(subscriptionName)
- subscriptionMeta.setTopicName(topicName)
- subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
- topicRef.subscribe(subscriptionMeta)
- }
- }
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/Bus.scala b/src/main/scala/xyz/driver/core/messaging/Bus.scala
deleted file mode 100644
index 75954f4..0000000
--- a/src/main/scala/xyz/driver/core/messaging/Bus.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import scala.concurrent._
-import scala.language.higherKinds
-
-/** Base trait for representing message buses.
- *
- * Message buses are expected to provide "at least once" delivery semantics and are
- * expected to retry delivery when a message remains unacknowledged for a reasonable
- * amount of time. */
-trait Bus {
-
- /** Type of unique message identifiers. Usually a string or UUID. */
- type MessageId
-
- /** Most general kind of message. Any implementation of a message bus must provide
- * the fields and methods specified in this trait. */
- trait BasicMessage[A] { self: Message[A] =>
-
- /** All messages must have unique IDs so that they can be acknowledged unambiguously. */
- def id: MessageId
-
- /** Actual message content. */
- def data: A
-
- }
-
- /** Actual type of messages provided by this bus. This must be a subtype of BasicMessage
- * (as that defines the minimal required fields of a messages), but may be refined to
- * provide bus-specific additional data. */
- type Message[A] <: BasicMessage[A]
-
- /** Type of a bus-specific configuration object can be used to tweak settings of subscriptions. */
- type SubscriptionConfig
-
- /** Default value for a subscription configuration. It is such that any service will have a unique subscription
- * for every topic, shared among all its instances. */
- val defaultSubscriptionConfig: SubscriptionConfig
-
- /** Maximum amount of messages handled in a single retrieval call. */
- val defaultMaxMessages = 64
-
- /** Execution context that is used to query and dispatch messages from this bus. */
- implicit val executionContext: ExecutionContext
-
- /** Retrieve any new messages in the mailbox of a subscription.
- *
- * Any retrieved messages become "outstanding" and should not be returned by this function
- * again until a reasonable (bus-specific) amount of time has passed and they remain unacknowledged.
- * In that case, they will again be considered new and will be returned by this function.
- *
- * Note that although outstanding and acknowledged messages will eventually be removed from
- * mailboxes, no guarantee can be made that a message will be delivered only once. */
- def fetchMessages[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig,
- maxMessages: Int = defaultMaxMessages): Future[Seq[Message[A]]]
-
- /** Acknowledge that a given series of messages has been handled and should
- * not be delivered again.
- *
- * Note that messages become eventually acknowledged and hence may be delivered more than once.
- * @see fetchMessages()
- */
- def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit]
-
- /** Send a series of messages to a topic.
- *
- * The returned future will complete once messages have been accepted to the underlying bus.
- * No guarantee can be made of their delivery to subscribers. */
- def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit]
-
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala b/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
deleted file mode 100644
index 1af5308..0000000
--- a/src/main/scala/xyz/driver/core/messaging/CreateOnDemand.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.async.Async.{async, await}
-import scala.concurrent.Future
-
-/** Utility mixin that will ensure that topics and subscriptions exist before
- * attempting to read or write from or to them.
- */
-trait CreateOnDemand extends Bus {
-
- /** Create the given topic. This operation is idempotent and is expected to succeed if the topic
- * already exists.
- */
- def createTopic(topic: Topic[_]): Future[Unit]
-
- /** Create the given subscription. This operation is idempotent and is expected to succeed if the subscription
- * already exists.
- */
- def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit]
-
- private val createdTopics = new ConcurrentHashMap[Topic[_], Future[Unit]]
- private val createdSubscriptions = new ConcurrentHashMap[(Topic[_], SubscriptionConfig), Future[Unit]]
-
- private def ensureTopic(topic: Topic[_]) =
- createdTopics.computeIfAbsent(topic, t => createTopic(t))
-
- private def ensureSubscription(topic: Topic[_], config: SubscriptionConfig) =
- createdSubscriptions.computeIfAbsent(topic -> config, {
- case (t, c) => createSubscription(t, c)
- })
-
- abstract override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
- await(ensureTopic(topic))
- await(super.publishMessages(topic, messages))
- }
-
- abstract override def fetchMessages[A](
- topic: Topic[A],
- config: SubscriptionConfig,
- maxMessages: Int): Future[Seq[Message[A]]] = async {
- await(ensureTopic(topic))
- await(ensureSubscription(topic, config))
- await(super.fetchMessages(topic, config, maxMessages))
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala b/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
deleted file mode 100644
index b296c50..0000000
--- a/src/main/scala/xyz/driver/core/messaging/GoogleBus.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import java.nio.ByteBuffer
-import java.nio.file.{Files, Path, Paths}
-import java.security.Signature
-import java.time.Instant
-import java.util
-
-import com.google.auth.oauth2.ServiceAccountCredentials
-import com.softwaremill.sttp._
-import spray.json.DefaultJsonProtocol._
-import spray.json._
-
-import scala.async.Async.{async, await}
-import scala.concurrent._
-import scala.concurrent.duration._
-
-/** A message bus implemented by [[https://cloud.google.com/pubsub/docs/overview Google's Pub/Sub service.]]
- *
- * == Overview ==
- *
- * The Pub/Sub message system is focused around a few concepts: 'topics',
- * 'subscriptions' and 'subscribers'. Messages are sent to ''topics'' which may
- * have multiple ''subscriptions'' associated to them. Every subscription to a
- * topic will receive all messages sent to the topic. Messages are enqueued in
- * a subscription until they are acknowledged by a ''subscriber''. Multiple
- * subscribers may be associated to a subscription, in which case messages will
- * get delivered arbitrarily among them.
- *
- * Topics and subscriptions are named resources which can be specified in
- * Pub/Sub's configuration and may be queried. Subscribers on the other hand,
- * are ephemeral processes that query a subscription on a regular basis, handle any
- * messages and acknowledge them.
- *
- * == Delivery semantics ==
- *
- * - at least once
- * - no ordering
- *
- * == Retention ==
- *
- * - configurable retry delay for unacknowledged messages, defaults to 10s
- * - undeliverable messages are kept for 7 days
- *
- * @param credentials Google cloud credentials, usually the same as used by a
- * service. Must have admin access to topics and
- * descriptions.
- * @param namespace The namespace in which this bus is running. Will be used to
- * determine the exact name of topics and subscriptions.
- * @param pullTimeout Delay after which a call to fetchMessages() will return an
- * empty list, assuming that no messages have been received.
- * @param executionContext Execution context to run any blocking commands.
- * @param backend sttp backend used to query Pub/Sub's HTTP API
- */
-class GoogleBus(
- credentials: ServiceAccountCredentials,
- namespace: String,
- pullTimeout: Duration = 90.seconds
-)(implicit val executionContext: ExecutionContext, backend: SttpBackend[Future, _])
- extends Bus {
- import GoogleBus.Protocol
-
- case class MessageId(subscription: String, ackId: String)
-
- case class PubsubMessage[A](id: MessageId, data: A, publishTime: Instant) extends super.BasicMessage[A]
- type Message[A] = PubsubMessage[A]
-
- /** Subscription-specific configuration
- *
- * @param subscriptionPrefix An identifier used to uniquely determine the name of Pub/Sub subscriptions.
- * All messages sent to a subscription will be dispatched arbitrarily
- * among any subscribers. Defaults to the email of the credentials used by this
- * bus instance, thereby giving every service a unique subscription to every topic.
- * To give every service instance a unique subscription, this must be changed to a
- * unique value.
- * @param ackTimeout Duration in which a message must be acknowledged before it is delivered again.
- */
- case class SubscriptionConfig(
- subscriptionPrefix: String = credentials.getClientEmail.split("@")(0),
- ackTimeout: FiniteDuration = 10.seconds
- )
- override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()
-
- /** Obtain an authentication token valid for the given duration
- * https://developers.google.com/identity/protocols/OAuth2ServiceAccount
- */
- private def freshAuthToken(duration: FiniteDuration): Future[String] = {
- def jwt = {
- val now = Instant.now().getEpochSecond
- val base64 = util.Base64.getEncoder
- val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8"))
- val body = base64.encodeToString(
- s"""|{
- | "iss": "${credentials.getClientEmail}",
- | "scope": "https://www.googleapis.com/auth/pubsub",
- | "aud": "https://www.googleapis.com/oauth2/v4/token",
- | "exp": ${now + duration.toSeconds},
- | "iat": $now
- |}""".stripMargin.getBytes("utf-8")
- )
- val signer = Signature.getInstance("SHA256withRSA")
- signer.initSign(credentials.getPrivateKey)
- signer.update(s"$header.$body".getBytes("utf-8"))
- val signature = base64.encodeToString(signer.sign())
- s"$header.$body.$signature"
- }
- sttp
- .post(uri"https://www.googleapis.com/oauth2/v4/token")
- .body(
- "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer",
- "assertion" -> jwt
- )
- .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String])
- .send()
- .map(_.unsafeBody)
- }
-
- // the token is cached a few minutes less than its validity to diminish latency of concurrent accesses at renewal time
- private val getToken: () => Future[String] = Refresh.every(55.minutes)(freshAuthToken(60.minutes))
-
- private val baseUri = uri"https://pubsub.googleapis.com/"
-
- private def rawTopicName(topic: Topic[_]) =
- s"projects/${credentials.getProjectId}/topics/$namespace.${topic.name}"
- private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
- s"projects/${credentials.getProjectId}/subscriptions/$namespace.${config.subscriptionPrefix}.${topic.name}"
-
- def createTopic(topic: Topic[_]): Future[Unit] = async {
- val request = sttp
- .put(baseUri.path(s"v1/${rawTopicName(topic)}"))
- .auth
- .bearer(await(getToken()))
- val result = await(request.send())
- result.body match {
- case Left(error) if result.code != 409 => // 409 <=> topic already exists, ignore it
- throw new NoSuchElementException(s"Error creating topic: Status code ${result.code}: $error")
- case _ => ()
- }
- }
-
- def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = async {
- val request = sttp
- .put(baseUri.path(s"v1/${rawSubscriptionName(config, topic)}"))
- .auth
- .bearer(await(getToken()))
- .body(
- JsObject(
- "topic" -> rawTopicName(topic).toJson,
- "ackDeadlineSeconds" -> config.ackTimeout.toSeconds.toJson
- ).compactPrint
- )
- val result = await(request.send())
- result.body match {
- case Left(error) if result.code != 409 => // 409 <=> subscription already exists, ignore it
- throw new NoSuchElementException(s"Error creating subscription: Status code ${result.code}: $error")
- case _ => ()
- }
- }
-
- override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = async {
- import Protocol.bufferFormat
- val buffers: Seq[ByteBuffer] = messages.map(topic.serialize)
- val request = sttp
- .post(baseUri.path(s"v1/${rawTopicName(topic)}:publish"))
- .auth
- .bearer(await(getToken()))
- .body(
- JsObject("messages" -> buffers.map(buffer => JsObject("data" -> buffer.toJson)).toJson).compactPrint
- )
- await(request.send()).unsafeBody
- ()
- }
-
- override def fetchMessages[A](
- topic: Topic[A],
- subscriptionConfig: SubscriptionConfig,
- maxMessages: Int): Future[Seq[PubsubMessage[A]]] = async {
- val subscription = rawSubscriptionName(subscriptionConfig, topic)
- val request = sttp
- .post(baseUri.path(s"v1/$subscription:pull"))
- .auth
- .bearer(await(getToken().map(x => x)))
- .body(
- JsObject(
- "returnImmediately" -> JsFalse,
- "maxMessages" -> JsNumber(maxMessages)
- ).compactPrint
- )
- .readTimeout(pullTimeout)
- .mapResponse(_.parseJson)
-
- val messages = await(request.send()).unsafeBody match {
- case JsObject(fields) if fields.isEmpty => Seq()
- case obj => obj.convertTo[Protocol.SubscriptionPull].receivedMessages
- }
-
- messages.map { msg =>
- PubsubMessage[A](
- MessageId(subscription, msg.ackId),
- topic.deserialize(msg.message.data),
- msg.message.publishTime
- )
- }
- }
-
- override def acknowledgeMessages(messageIds: Seq[MessageId]): Future[Unit] = async {
- val request = sttp
- .post(baseUri.path(s"v1/${messageIds.head.subscription}:acknowledge"))
- .auth
- .bearer(await(getToken()))
- .body(
- JsObject("ackIds" -> JsArray(messageIds.toVector.map(m => JsString(m.ackId)))).compactPrint
- )
- await(request.send()).unsafeBody
- ()
- }
-
-}
-
-object GoogleBus {
-
- private object Protocol extends DefaultJsonProtocol {
- case class SubscriptionPull(receivedMessages: Seq[ReceivedMessage])
- case class ReceivedMessage(ackId: String, message: PubsubMessage)
- case class PubsubMessage(data: ByteBuffer, publishTime: Instant)
-
- implicit val timeFormat: JsonFormat[Instant] = new JsonFormat[Instant] {
- override def write(obj: Instant): JsValue = JsString(obj.toString)
- override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String])
- }
- implicit val bufferFormat: JsonFormat[ByteBuffer] = new JsonFormat[ByteBuffer] {
- override def write(obj: ByteBuffer): JsValue =
- JsString(util.Base64.getEncoder.encodeToString(obj.array()))
-
- override def read(json: JsValue): ByteBuffer = {
- val encodedBytes = json.convertTo[String].getBytes("utf-8")
- val decodedBytes = util.Base64.getDecoder.decode(encodedBytes)
- ByteBuffer.wrap(decodedBytes)
- }
- }
-
- implicit val pubsubMessageFormat: RootJsonFormat[PubsubMessage] = jsonFormat2(PubsubMessage)
- implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = jsonFormat2(ReceivedMessage)
- implicit val subscrptionPullFormat: RootJsonFormat[SubscriptionPull] = jsonFormat1(SubscriptionPull)
- }
-
- def fromKeyfile(keyfile: Path, namespace: String)(
- implicit executionContext: ExecutionContext,
- backend: SttpBackend[Future, _]): GoogleBus = {
- val creds = ServiceAccountCredentials.fromStream(Files.newInputStream(keyfile))
- new GoogleBus(creds, namespace)
- }
-
- @deprecated(
- "Reading from the environment adds opaque dependencies and hance leads to extra complexity. Use fromKeyfile instead.",
- "driver-core 1.12.2")
- def fromEnv(implicit executionContext: ExecutionContext, backend: SttpBackend[Future, _]): GoogleBus = {
- def env(key: String) = {
- require(sys.env.contains(key), s"Environment variable $key is not set.")
- sys.env(key)
- }
- val keyfile = Paths.get(env("GOOGLE_APPLICATION_CREDENTIALS"))
- fromKeyfile(keyfile, env("SERVICE_NAMESPACE"))
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala b/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
deleted file mode 100644
index 45c9ed5..0000000
--- a/src/main/scala/xyz/driver/core/messaging/QueueBus.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import java.nio.ByteBuffer
-
-import akka.actor.{Actor, ActorSystem, Props}
-
-import scala.collection.mutable
-import scala.collection.mutable.Map
-import scala.concurrent.duration._
-import scala.concurrent.{Future, Promise}
-
-/** A bus backed by an asynchronous queue. Note that this bus requires a local actor system
- * and is intended for testing purposes only. */
-class QueueBus(implicit system: ActorSystem) extends Bus {
- import system.dispatcher
-
- override type SubscriptionConfig = Long
- override val defaultSubscriptionConfig: Long = 0
- override val executionContext = system.dispatcher
-
- override type MessageId = (String, SubscriptionConfig, Long)
- type Message[A] = BasicMessage[A]
-
- private object ActorMessages {
- case class Send[A](topic: Topic[A], messages: Seq[A])
- case class Ack(messages: Seq[MessageId])
- case class Fetch[A](topic: Topic[A], cfg: SubscriptionConfig, max: Int, response: Promise[Seq[Message[A]]])
- case object Unack
- }
-
- class Subscription {
- val mailbox: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
- val unacked: mutable.Map[MessageId, ByteBuffer] = mutable.Map.empty
- }
-
- private class BusMaster extends Actor {
- var _id = 0L
- def nextId(topic: String, cfg: SubscriptionConfig): (String, SubscriptionConfig, Long) = {
- _id += 1; (topic, cfg, _id)
- }
-
- val topics: mutable.Map[String, mutable.Map[SubscriptionConfig, Subscription]] = mutable.Map.empty
-
- def ensureSubscription(topic: String, cfg: SubscriptionConfig): Unit = {
- topics.get(topic) match {
- case Some(t) =>
- t.getOrElseUpdate(cfg, new Subscription)
- case None =>
- topics += topic -> mutable.Map.empty
- ensureSubscription(topic, cfg)
- }
- }
-
- override def preStart(): Unit = {
- context.system.scheduler.schedule(1.seconds, 1.seconds) {
- self ! ActorMessages.Unack
- }
- }
-
- override def receive: Receive = {
- case ActorMessages.Send(topic, messages) =>
- val buffers = messages.map(topic.serialize)
- val subscriptions = topics.getOrElse(topic.name, Map.empty)
- for ((cfg, subscription) <- subscriptions) {
- for (buffer <- buffers) {
- subscription.mailbox += nextId(topic.name, cfg) -> buffer
- }
- }
-
- case ActorMessages.Fetch(topic, cfg, max, promise) =>
- ensureSubscription(topic.name, cfg)
- val subscription = topics(topic.name)(cfg)
- val messages = subscription.mailbox.take(max)
- subscription.unacked ++= messages
- subscription.mailbox --= messages.map(_._1)
- promise.success(messages.toSeq.map {
- case (ackId, buffer) =>
- new Message[Any] {
- val id = ackId
- val data = topic.deserialize(buffer)
- }
- })
-
- case ActorMessages.Ack(messageIds) =>
- for (id @ (topic, cfg, _) <- messageIds) {
- ensureSubscription(topic, cfg)
- val subscription = topics(topic)(cfg)
- subscription.unacked -= id
- }
-
- case ActorMessages.Unack =>
- for ((_, subscriptions) <- topics) {
- for ((_, subscription) <- subscriptions) {
- subscription.mailbox ++= subscription.unacked
- subscription.unacked.clear()
- }
- }
- }
-
- }
-
- val actor = system.actorOf(Props(new BusMaster))
-
- override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = Future {
- actor ! ActorMessages.Send(topic, messages)
- }
-
- override def fetchMessages[A](
- topic: messaging.Topic[A],
- config: SubscriptionConfig,
- maxMessages: Int): Future[Seq[Message[A]]] = {
- val result = Promise[Seq[Message[A]]]
- actor ! ActorMessages.Fetch(topic, config, maxMessages, result)
- result.future
- }
-
- override def acknowledgeMessages(ids: Seq[MessageId]): Future[Unit] = Future {
- actor ! ActorMessages.Ack(ids)
- }
-
-}
-
-object QueueBus {
- def apply(implicit system: ActorSystem) = new QueueBus
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
deleted file mode 100644
index 44d75cd..0000000
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import akka.NotUsed
-import akka.stream.Materializer
-import akka.stream.scaladsl.{Flow, RestartSource, Sink, Source}
-
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-/** An extension to message buses that offers an Akka-Streams API.
- *
- * Example usage of a stream that subscribes to one topic, prints any messages
- * it receives and finally acknowledges them
- * their receipt.
- * {{{
- * val bus: StreamBus = ???
- * val topic = Topic.string("topic")
- * bus.subscribe(topic1)
- * .map{ msg =>
- * print(msg.data)
- * msg
- * }
- * .to(bus.acknowledge)
- * .run()
- * }}} */
-trait StreamBus extends Bus {
-
- /** Flow that publishes any messages to a given topic.
- * Emits messages once they have been published to the underlying bus. */
- def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
- Flow[A]
- .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
- .mapAsync(1) { a =>
- publishMessages(topic, a).map(_ => a)
- }
- .mapConcat(list => list.toList)
- }
-
- /** Sink that acknowledges the receipt of a message. */
- def acknowledge: Sink[MessageId, NotUsed] = {
- Flow[MessageId]
- .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
- .mapAsync(1)(acknowledgeMessages(_))
- .to(Sink.ignore)
- }
-
- /** Source that listens to a subscription and receives any messages sent to its topic. */
- def subscribe[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
- Source
- .unfoldAsync((topic, config))(
- topicAndConfig =>
- fetchMessages(topicAndConfig._1, topicAndConfig._2, defaultMaxMessages).map(msgs =>
- Some(topicAndConfig -> msgs))
- )
- .filter(_.nonEmpty)
- .mapConcat(messages => messages.toList)
- }
-
- def runWithRestart[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig,
- minBackoff: FiniteDuration = 3.seconds,
- maxBackoff: FiniteDuration = 30.seconds,
- randomFactor: Double = 0.2,
- maxRestarts: Int = 20
- )(processMessage: Flow[Message[A], List[MessageId], NotUsed])(implicit mat: Materializer): NotUsed = {
- RestartSource
- .withBackoff[MessageId](
- minBackoff,
- maxBackoff,
- randomFactor,
- maxRestarts
- ) { () =>
- subscribe(topic, config)
- .via(processMessage)
- .log(topic.name)
- .mapConcat(identity)
- }
- .to(acknowledge)
- .run()
- }
-
- def handleMessage[A](
- topic: Topic[A],
- config: SubscriptionConfig = defaultSubscriptionConfig,
- parallelism: Int = 1,
- minBackoff: FiniteDuration = 3.seconds,
- maxBackoff: FiniteDuration = 30.seconds,
- randomFactor: Double = 0.2,
- maxRestarts: Int = 20
- )(processMessage: A => Future[_])(implicit mat: Materializer): NotUsed = {
- runWithRestart(topic, config, minBackoff, maxBackoff, randomFactor, maxRestarts) {
- Flow[Message[A]].mapAsync(parallelism) { message =>
- processMessage(message.data).map(_ => message.id :: Nil)
- }
- }
- }
-}
diff --git a/src/main/scala/xyz/driver/core/messaging/Topic.scala b/src/main/scala/xyz/driver/core/messaging/Topic.scala
deleted file mode 100644
index 32fd764..0000000
--- a/src/main/scala/xyz/driver/core/messaging/Topic.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package xyz.driver.core
-package messaging
-
-import java.nio.ByteBuffer
-
-/** A topic is a named group of messages that all share a common schema.
- * @tparam Message type of messages sent over this topic */
-trait Topic[Message] {
-
- /** Name of this topic (must be unique). */
- def name: String
-
- /** Convert a message to its wire format that will be sent over a bus. */
- def serialize(message: Message): ByteBuffer
-
- /** Convert a message from its wire format. */
- def deserialize(message: ByteBuffer): Message
-
-}
-
-object Topic {
-
- /** Create a new "raw" topic without a schema, providing access to the underlying bytes of messages. */
- def raw(name0: String): Topic[ByteBuffer] = new Topic[ByteBuffer] {
- def name = name0
- override def serialize(message: ByteBuffer): ByteBuffer = message
- override def deserialize(message: ByteBuffer): ByteBuffer = message
- }
-
- /** Create a topic that represents data as UTF-8 encoded strings. */
- def string(name0: String): Topic[String] = new Topic[String] {
- def name = name0
- override def serialize(message: String): ByteBuffer = {
- ByteBuffer.wrap(message.getBytes("utf-8"))
- }
- override def deserialize(message: ByteBuffer): String = {
- val bytes = new Array[Byte](message.remaining())
- message.get(bytes)
- new String(bytes, "utf-8")
- }
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
deleted file mode 100644
index b5e8678..0000000
--- a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-package xyz.driver.core.storage
-
-import java.io.ByteArrayInputStream
-import java.net.URL
-import java.nio.file.Path
-import java.util.Date
-
-import akka.Done
-import akka.stream.scaladsl.{Sink, Source, StreamConverters}
-import akka.util.ByteString
-import com.aliyun.oss.OSSClient
-import com.aliyun.oss.model.ObjectPermission
-import com.typesafe.config.Config
-import xyz.driver.core.time.provider.TimeProvider
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Future}
-
-class AliyunBlobStorage(
- client: OSSClient,
- bucketId: String,
- timeProvider: TimeProvider,
- chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext)
- extends SignedBlobStorage {
- override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
- client.putObject(bucketId, name, new ByteArrayInputStream(content))
- name
- }
-
- override def uploadFile(name: String, content: Path): Future[String] = Future {
- client.putObject(bucketId, name, content.toFile)
- name
- }
-
- override def exists(name: String): Future[Boolean] = Future {
- client.doesObjectExist(bucketId, name)
- }
-
- override def list(prefix: String): Future[Set[String]] = Future {
- client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut)
- }
-
- override def content(name: String): Future[Option[Array[Byte]]] = Future {
- Option(client.getObject(bucketId, name)).map { obj =>
- val inputStream = obj.getObjectContent
- Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray
- }
- }
-
- override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future {
- Option(client.getObject(bucketId, name)).map { obj =>
- StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize)
- }
- }
-
- override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
- StreamConverters
- .asInputStream()
- .mapMaterializedValue(is =>
- Future {
- client.putObject(bucketId, name, is)
- Done
- })
- }
-
- override def delete(name: String): Future[String] = Future {
- client.deleteObject(bucketId, name)
- name
- }
-
- override def url(name: String): Future[Option[URL]] = Future {
- // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm
- Option(client.getObjectAcl(bucketId, name)).map { acl =>
- val isPrivate = acl.getPermission == ObjectPermission.Private
- val bucket = client.getBucketInfo(bucketId).getBucket
- val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint
- new URL(s"https://$bucketId.$endpointUrl/$name")
- }
- }
-
- override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
- if (client.doesObjectExist(bucketId, name)) {
- val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis)
- Some(client.generatePresignedUrl(bucketId, name, expiration))
- } else {
- None
- }
- }
-}
-
-object AliyunBlobStorage {
- val DefaultChunkSize: Int = 8192
-
- def apply(config: Config, bucketId: String, timeProvider: TimeProvider)(
- implicit ec: ExecutionContext): AliyunBlobStorage = {
- val clientId = config.getString("storage.aliyun.clientId")
- val clientSecret = config.getString("storage.aliyun.clientSecret")
- val endpoint = config.getString("storage.aliyun.endpoint")
- this(clientId, clientSecret, endpoint, bucketId, timeProvider)
- }
-
- def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)(
- implicit ec: ExecutionContext): AliyunBlobStorage = {
- val client = new OSSClient(endpoint, clientId, clientSecret)
- new AliyunBlobStorage(client, bucketId, timeProvider)
- }
-}
diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
deleted file mode 100644
index 0cde96a..0000000
--- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-package xyz.driver.core.storage
-
-import java.net.URL
-import java.nio.file.Path
-
-import akka.Done
-import akka.stream.scaladsl.{Sink, Source}
-import akka.util.ByteString
-
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-
-/** Binary key-value store, typically implemented by cloud storage. */
-trait BlobStorage {
-
- /** Upload data by value. */
- def uploadContent(name: String, content: Array[Byte]): Future[String]
-
- /** Upload data from an existing file. */
- def uploadFile(name: String, content: Path): Future[String]
-
- def exists(name: String): Future[Boolean]
-
- /** List available keys. The prefix determines which keys should be listed
- * and depends on the implementation (for instance, a file system backed
- * blob store will treat a prefix as a directory path). */
- def list(prefix: String): Future[Set[String]]
-
- /** Get all the content of a given object. */
- def content(name: String): Future[Option[Array[Byte]]]
-
- /** Stream data asynchronously and with backpressure. */
- def download(name: String): Future[Option[Source[ByteString, Any]]]
-
- /** Get a sink to upload data. */
- def upload(name: String): Future[Sink[ByteString, Future[Done]]]
-
- /** Delete a stored value. */
- def delete(name: String): Future[String]
-
- /**
- * Path to specified resource. Checks that the resource exists and returns None if
- * it is not found. Depending on the implementation, may throw.
- */
- def url(name: String): Future[Option[URL]]
-}
-
-trait SignedBlobStorage extends BlobStorage {
- def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]]
-}
diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala
deleted file mode 100644
index e12c73d..0000000
--- a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package xyz.driver.core.storage
-
-import java.net.URL
-import java.nio.file.{Files, Path, StandardCopyOption}
-
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import akka.util.ByteString
-import akka.{Done, NotUsed}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future}
-
-/** A blob store that is backed by a local filesystem. All objects are stored relative to the given
- * root path. Slashes ('/') in blob names are treated as usual path separators and are converted
- * to directories. */
-class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage {
-
- private def ensureParents(file: Path): Path = {
- Files.createDirectories(file.getParent())
- file
- }
-
- private def file(name: String) = root.resolve(name)
-
- override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
- Files.write(ensureParents(file(name)), content)
- name
- }
- override def uploadFile(name: String, content: Path): Future[String] = Future {
- Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING)
- name
- }
-
- override def exists(name: String): Future[Boolean] = Future {
- val path = file(name)
- Files.exists(path) && Files.isReadable(path)
- }
-
- override def list(prefix: String): Future[Set[String]] = Future {
- val dir = file(prefix)
- Files
- .list(dir)
- .iterator()
- .asScala
- .map(p => root.relativize(p))
- .map(_.toString)
- .toSet
- }
-
- override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map {
- case true =>
- Some(Files.readAllBytes(file(name)))
- case false => None
- }
-
- override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future {
- if (Files.exists(file(name))) {
- Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed))
- } else {
- None
- }
- }
-
- override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
- val f = ensureParents(file(name))
- FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done))
- }
-
- override def delete(name: String): Future[String] = exists(name).map { e =>
- if (e) {
- Files.delete(file(name))
- }
- name
- }
-
- override def url(name: String): Future[Option[URL]] = exists(name) map {
- case true =>
- Some(root.resolve(name).toUri.toURL)
- case false =>
- None
- }
-}
diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
deleted file mode 100644
index 95164c7..0000000
--- a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-package xyz.driver.core.storage
-
-import java.io.{FileInputStream, InputStream}
-import java.net.URL
-import java.nio.file.Path
-
-import akka.Done
-import akka.stream.scaladsl.Sink
-import akka.util.ByteString
-import com.google.api.gax.paging.Page
-import com.google.auth.oauth2.ServiceAccountCredentials
-import com.google.cloud.storage.Storage.BlobListOption
-import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Future}
-
-class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)(
- implicit ec: ExecutionContext)
- extends BlobStorage with SignedBlobStorage {
-
- private val bucket: Bucket = client.get(bucketId)
- require(bucket != null, s"Bucket $bucketId does not exist.")
-
- override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
- bucket.create(name, content).getBlobId.getName
- }
-
- override def uploadFile(name: String, content: Path): Future[String] = Future {
- bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName
- }
-
- override def exists(name: String): Future[Boolean] = Future {
- bucket.get(name) != null
- }
-
- override def list(prefix: String): Future[Set[String]] = Future {
- val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix))
- page
- .iterateAll()
- .asScala
- .map(_.getName())
- .toSet
- }
-
- override def content(name: String): Future[Option[Array[Byte]]] = Future {
- Option(bucket.get(name)).map(blob => blob.getContent())
- }
-
- override def download(name: String) = Future {
- Option(bucket.get(name)).map { blob =>
- ChannelStream.fromChannel(() => blob.reader(), chunkSize)
- }
- }
-
- override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
- val blob = bucket.create(name, Array.emptyByteArray)
- ChannelStream.toChannel(() => blob.writer(), chunkSize)
- }
-
- override def delete(name: String): Future[String] = Future {
- client.delete(BlobId.of(bucketId, name))
- name
- }
-
- override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
- Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit))
- }
-
- override def url(name: String): Future[Option[URL]] = Future {
- val protocol: String = "https"
- val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/"
- Option(bucket.get(name)).map { blob =>
- new URL(protocol, resourcePath, blob.getName)
- }
- }
-}
-
-object GcsBlobStorage {
- final val DefaultChunkSize = 8192
-
- private def newClient(key: InputStream): Storage =
- StorageOptions
- .newBuilder()
- .setCredentials(ServiceAccountCredentials.fromStream(key))
- .build()
- .getService()
-
- def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)(
- implicit ec: ExecutionContext): GcsBlobStorage = {
- val client = newClient(new FileInputStream(keyfile.toFile))
- new GcsBlobStorage(client, bucketId, chunkSize)
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala
deleted file mode 100644
index fc652be..0000000
--- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-package xyz.driver.core.storage
-
-import java.nio.ByteBuffer
-import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
-
-import akka.stream._
-import akka.stream.scaladsl.{Sink, Source}
-import akka.stream.stage._
-import akka.util.ByteString
-import akka.{Done, NotUsed}
-
-import scala.concurrent.{Future, Promise}
-import scala.util.control.NonFatal
-
-class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int)
- extends GraphStage[SourceShape[ByteString]] {
-
- val out = Outlet[ByteString]("ChannelSource.out")
- val shape = SourceShape(out)
-
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- val channel = createChannel()
-
- object Handler extends OutHandler {
- override def onPull(): Unit = {
- try {
- val buffer = ByteBuffer.allocate(chunkSize)
- if (channel.read(buffer) > 0) {
- buffer.flip()
- push(out, ByteString.fromByteBuffer(buffer))
- } else {
- completeStage()
- }
- } catch {
- case NonFatal(_) =>
- channel.close()
- }
- }
- override def onDownstreamFinish(): Unit = {
- channel.close()
- }
- }
-
- setHandler(out, Handler)
- }
-
-}
-
-class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int)
- extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] {
-
- val in = Inlet[ByteString]("ChannelSink.in")
- val shape = SinkShape(in)
-
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
- val promise = Promise[Done]()
- val logic = new GraphStageLogic(shape) {
- val channel = createChannel()
-
- object Handler extends InHandler {
- override def onPush(): Unit = {
- try {
- val data = grab(in)
- channel.write(data.asByteBuffer)
- pull(in)
- } catch {
- case NonFatal(e) =>
- channel.close()
- promise.failure(e)
- }
- }
-
- override def onUpstreamFinish(): Unit = {
- channel.close()
- completeStage()
- promise.success(Done)
- }
-
- override def onUpstreamFailure(ex: Throwable): Unit = {
- channel.close()
- promise.failure(ex)
- }
- }
-
- setHandler(in, Handler)
-
- override def preStart(): Unit = {
- pull(in)
- }
- }
- (logic, promise.future)
- }
-
-}
-
-object ChannelStream {
-
- def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = {
- Source
- .fromGraph(new ChannelSource(channel, chunkSize))
- .withAttributes(Attributes(ActorAttributes.IODispatcher))
- .async
- }
-
- def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = {
- Sink
- .fromGraph(new ChannelSink(channel, chunkSize))
- .withAttributes(Attributes(ActorAttributes.IODispatcher))
- .async
- }
-
-}
diff --git a/src/test/scala/xyz/driver/core/BlobStorageTest.scala b/src/test/scala/xyz/driver/core/BlobStorageTest.scala
deleted file mode 100644
index 811cc60..0000000
--- a/src/test/scala/xyz/driver/core/BlobStorageTest.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-package xyz.driver.core
-
-import java.nio.file.Files
-
-import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl._
-import akka.util.ByteString
-import org.scalatest._
-import org.scalatest.concurrent.ScalaFutures
-import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage}
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-class BlobStorageTest extends FlatSpec with ScalaFutures {
-
- implicit val patientce = PatienceConfig(timeout = 100.seconds)
-
- implicit val system = ActorSystem("blobstorage-test")
- implicit val mat = ActorMaterializer()
- import system.dispatcher
-
- def storageBehaviour(storage: BlobStorage) = {
- val key = "foo"
- val data = "hello world".getBytes
- it should "upload data" in {
- assert(storage.exists(key).futureValue === false)
- assert(storage.uploadContent(key, data).futureValue === key)
- assert(storage.exists(key).futureValue === true)
- }
- it should "download data" in {
- val content = storage.content(key).futureValue
- assert(content.isDefined)
- assert(content.get === data)
- }
- it should "not download non-existing data" in {
- assert(storage.content("bar").futureValue.isEmpty)
- }
- it should "overwrite an existing key" in {
- val newData = "new string".getBytes("utf-8")
- assert(storage.uploadContent(key, newData).futureValue === key)
- assert(storage.content(key).futureValue.get === newData)
- }
- it should "upload a file" in {
- val tmp = Files.createTempFile("testfile", "txt")
- Files.write(tmp, data)
- assert(storage.uploadFile(key, tmp).futureValue === key)
- Files.delete(tmp)
- }
- it should "upload content" in {
- val text = "foobar"
- val src = Source
- .single(text)
- .map(l => ByteString(l))
- src.runWith(storage.upload(key).futureValue).futureValue
- assert(storage.content(key).futureValue.map(_.toSeq) === Some("foobar".getBytes.toSeq))
- }
- it should "delete content" in {
- assert(storage.exists(key).futureValue)
- storage.delete(key).futureValue
- assert(!storage.exists(key).futureValue)
- }
- it should "download content" in {
- storage.uploadContent(key, data) futureValue
- val srcOpt = storage.download(key).futureValue
- assert(srcOpt.isDefined)
- val src = srcOpt.get
- val content: Future[Array[Byte]] = src.runWith(Sink.fold(Array[Byte]())(_ ++ _))
- assert(content.futureValue === data)
- }
- it should "list keys" in {
- assert(storage.list("").futureValue === Set(key))
- storage.uploadContent("a/a.txt", data).futureValue
- storage.uploadContent("a/b", data).futureValue
- storage.uploadContent("c/d", data).futureValue
- storage.uploadContent("d", data).futureValue
- assert(storage.list("").futureValue === Set(key, "a", "c", "d"))
- assert(storage.list("a").futureValue === Set("a/a.txt", "a/b"))
- assert(storage.list("a").futureValue === Set("a/a.txt", "a/b"))
- assert(storage.list("c").futureValue === Set("c/d"))
- }
- it should "get valid URL" in {
- assert(storage.exists(key).futureValue === true)
- val fooUrl = storage.url(key).futureValue
- assert(fooUrl.isDefined)
- }
- }
-
- "File system storage" should behave like storageBehaviour(
- new FileSystemBlobStorage(Files.createTempDirectory("test")))
-
-}
diff --git a/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala b/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala
deleted file mode 100644
index 8dd0776..0000000
--- a/src/test/scala/xyz/driver/core/messaging/QueueBusTest.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-package xyz.driver.core.messaging
-
-import akka.actor.ActorSystem
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.ScalaFutures
-
-import scala.concurrent.ExecutionContext
-import scala.concurrent.duration._
-
-class QueueBusTest extends FlatSpec with ScalaFutures {
- implicit val patience: PatienceConfig = PatienceConfig(timeout = 10.seconds)
-
- def busBehaviour(bus: Bus)(implicit ec: ExecutionContext): Unit = {
-
- it should "deliver messages to a subscriber" in {
- val topic = Topic.string("test.topic1")
- bus.fetchMessages(topic).futureValue
- bus.publishMessages(topic, Seq("hello world!"))
- Thread.sleep(100)
- val messages = bus.fetchMessages(topic)
- assert(messages.futureValue.map(_.data).toList == List("hello world!"))
- }
- }
-
- implicit val system: ActorSystem = ActorSystem("queue-test")
- import system.dispatcher
-
- "A queue-based bus" should behave like busBehaviour(new QueueBus)
-
-}