aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/storage')
-rw-r--r--src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala108
-rw-r--r--src/main/scala/xyz/driver/core/storage/BlobStorage.scala50
-rw-r--r--src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala82
-rw-r--r--src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala96
-rw-r--r--src/main/scala/xyz/driver/core/storage/channelStreams.scala112
5 files changed, 0 insertions, 448 deletions
diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
deleted file mode 100644
index b5e8678..0000000
--- a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-package xyz.driver.core.storage
-
-import java.io.ByteArrayInputStream
-import java.net.URL
-import java.nio.file.Path
-import java.util.Date
-
-import akka.Done
-import akka.stream.scaladsl.{Sink, Source, StreamConverters}
-import akka.util.ByteString
-import com.aliyun.oss.OSSClient
-import com.aliyun.oss.model.ObjectPermission
-import com.typesafe.config.Config
-import xyz.driver.core.time.provider.TimeProvider
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Future}
-
-class AliyunBlobStorage(
- client: OSSClient,
- bucketId: String,
- timeProvider: TimeProvider,
- chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext)
- extends SignedBlobStorage {
- override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
- client.putObject(bucketId, name, new ByteArrayInputStream(content))
- name
- }
-
- override def uploadFile(name: String, content: Path): Future[String] = Future {
- client.putObject(bucketId, name, content.toFile)
- name
- }
-
- override def exists(name: String): Future[Boolean] = Future {
- client.doesObjectExist(bucketId, name)
- }
-
- override def list(prefix: String): Future[Set[String]] = Future {
- client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut)
- }
-
- override def content(name: String): Future[Option[Array[Byte]]] = Future {
- Option(client.getObject(bucketId, name)).map { obj =>
- val inputStream = obj.getObjectContent
- Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray
- }
- }
-
- override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future {
- Option(client.getObject(bucketId, name)).map { obj =>
- StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize)
- }
- }
-
- override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
- StreamConverters
- .asInputStream()
- .mapMaterializedValue(is =>
- Future {
- client.putObject(bucketId, name, is)
- Done
- })
- }
-
- override def delete(name: String): Future[String] = Future {
- client.deleteObject(bucketId, name)
- name
- }
-
- override def url(name: String): Future[Option[URL]] = Future {
- // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm
- Option(client.getObjectAcl(bucketId, name)).map { acl =>
- val isPrivate = acl.getPermission == ObjectPermission.Private
- val bucket = client.getBucketInfo(bucketId).getBucket
- val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint
- new URL(s"https://$bucketId.$endpointUrl/$name")
- }
- }
-
- override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
- if (client.doesObjectExist(bucketId, name)) {
- val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis)
- Some(client.generatePresignedUrl(bucketId, name, expiration))
- } else {
- None
- }
- }
-}
-
-object AliyunBlobStorage {
- val DefaultChunkSize: Int = 8192
-
- def apply(config: Config, bucketId: String, timeProvider: TimeProvider)(
- implicit ec: ExecutionContext): AliyunBlobStorage = {
- val clientId = config.getString("storage.aliyun.clientId")
- val clientSecret = config.getString("storage.aliyun.clientSecret")
- val endpoint = config.getString("storage.aliyun.endpoint")
- this(clientId, clientSecret, endpoint, bucketId, timeProvider)
- }
-
- def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)(
- implicit ec: ExecutionContext): AliyunBlobStorage = {
- val client = new OSSClient(endpoint, clientId, clientSecret)
- new AliyunBlobStorage(client, bucketId, timeProvider)
- }
-}
diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
deleted file mode 100644
index 0cde96a..0000000
--- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-package xyz.driver.core.storage
-
-import java.net.URL
-import java.nio.file.Path
-
-import akka.Done
-import akka.stream.scaladsl.{Sink, Source}
-import akka.util.ByteString
-
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-
-/** Binary key-value store, typically implemented by cloud storage. */
-trait BlobStorage {
-
- /** Upload data by value. */
- def uploadContent(name: String, content: Array[Byte]): Future[String]
-
- /** Upload data from an existing file. */
- def uploadFile(name: String, content: Path): Future[String]
-
- def exists(name: String): Future[Boolean]
-
- /** List available keys. The prefix determines which keys should be listed
- * and depends on the implementation (for instance, a file system backed
- * blob store will treat a prefix as a directory path). */
- def list(prefix: String): Future[Set[String]]
-
- /** Get all the content of a given object. */
- def content(name: String): Future[Option[Array[Byte]]]
-
- /** Stream data asynchronously and with backpressure. */
- def download(name: String): Future[Option[Source[ByteString, Any]]]
-
- /** Get a sink to upload data. */
- def upload(name: String): Future[Sink[ByteString, Future[Done]]]
-
- /** Delete a stored value. */
- def delete(name: String): Future[String]
-
- /**
- * Path to specified resource. Checks that the resource exists and returns None if
- * it is not found. Depending on the implementation, may throw.
- */
- def url(name: String): Future[Option[URL]]
-}
-
-trait SignedBlobStorage extends BlobStorage {
- def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]]
-}
diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala
deleted file mode 100644
index e12c73d..0000000
--- a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package xyz.driver.core.storage
-
-import java.net.URL
-import java.nio.file.{Files, Path, StandardCopyOption}
-
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import akka.util.ByteString
-import akka.{Done, NotUsed}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future}
-
-/** A blob store that is backed by a local filesystem. All objects are stored relative to the given
- * root path. Slashes ('/') in blob names are treated as usual path separators and are converted
- * to directories. */
-class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage {
-
- private def ensureParents(file: Path): Path = {
- Files.createDirectories(file.getParent())
- file
- }
-
- private def file(name: String) = root.resolve(name)
-
- override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
- Files.write(ensureParents(file(name)), content)
- name
- }
- override def uploadFile(name: String, content: Path): Future[String] = Future {
- Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING)
- name
- }
-
- override def exists(name: String): Future[Boolean] = Future {
- val path = file(name)
- Files.exists(path) && Files.isReadable(path)
- }
-
- override def list(prefix: String): Future[Set[String]] = Future {
- val dir = file(prefix)
- Files
- .list(dir)
- .iterator()
- .asScala
- .map(p => root.relativize(p))
- .map(_.toString)
- .toSet
- }
-
- override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map {
- case true =>
- Some(Files.readAllBytes(file(name)))
- case false => None
- }
-
- override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future {
- if (Files.exists(file(name))) {
- Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed))
- } else {
- None
- }
- }
-
- override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
- val f = ensureParents(file(name))
- FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done))
- }
-
- override def delete(name: String): Future[String] = exists(name).map { e =>
- if (e) {
- Files.delete(file(name))
- }
- name
- }
-
- override def url(name: String): Future[Option[URL]] = exists(name) map {
- case true =>
- Some(root.resolve(name).toUri.toURL)
- case false =>
- None
- }
-}
diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
deleted file mode 100644
index 95164c7..0000000
--- a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-package xyz.driver.core.storage
-
-import java.io.{FileInputStream, InputStream}
-import java.net.URL
-import java.nio.file.Path
-
-import akka.Done
-import akka.stream.scaladsl.Sink
-import akka.util.ByteString
-import com.google.api.gax.paging.Page
-import com.google.auth.oauth2.ServiceAccountCredentials
-import com.google.cloud.storage.Storage.BlobListOption
-import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Future}
-
-class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)(
- implicit ec: ExecutionContext)
- extends BlobStorage with SignedBlobStorage {
-
- private val bucket: Bucket = client.get(bucketId)
- require(bucket != null, s"Bucket $bucketId does not exist.")
-
- override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
- bucket.create(name, content).getBlobId.getName
- }
-
- override def uploadFile(name: String, content: Path): Future[String] = Future {
- bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName
- }
-
- override def exists(name: String): Future[Boolean] = Future {
- bucket.get(name) != null
- }
-
- override def list(prefix: String): Future[Set[String]] = Future {
- val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix))
- page
- .iterateAll()
- .asScala
- .map(_.getName())
- .toSet
- }
-
- override def content(name: String): Future[Option[Array[Byte]]] = Future {
- Option(bucket.get(name)).map(blob => blob.getContent())
- }
-
- override def download(name: String) = Future {
- Option(bucket.get(name)).map { blob =>
- ChannelStream.fromChannel(() => blob.reader(), chunkSize)
- }
- }
-
- override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
- val blob = bucket.create(name, Array.emptyByteArray)
- ChannelStream.toChannel(() => blob.writer(), chunkSize)
- }
-
- override def delete(name: String): Future[String] = Future {
- client.delete(BlobId.of(bucketId, name))
- name
- }
-
- override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
- Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit))
- }
-
- override def url(name: String): Future[Option[URL]] = Future {
- val protocol: String = "https"
- val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/"
- Option(bucket.get(name)).map { blob =>
- new URL(protocol, resourcePath, blob.getName)
- }
- }
-}
-
-object GcsBlobStorage {
- final val DefaultChunkSize = 8192
-
- private def newClient(key: InputStream): Storage =
- StorageOptions
- .newBuilder()
- .setCredentials(ServiceAccountCredentials.fromStream(key))
- .build()
- .getService()
-
- def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)(
- implicit ec: ExecutionContext): GcsBlobStorage = {
- val client = newClient(new FileInputStream(keyfile.toFile))
- new GcsBlobStorage(client, bucketId, chunkSize)
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala
deleted file mode 100644
index fc652be..0000000
--- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-package xyz.driver.core.storage
-
-import java.nio.ByteBuffer
-import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
-
-import akka.stream._
-import akka.stream.scaladsl.{Sink, Source}
-import akka.stream.stage._
-import akka.util.ByteString
-import akka.{Done, NotUsed}
-
-import scala.concurrent.{Future, Promise}
-import scala.util.control.NonFatal
-
-class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int)
- extends GraphStage[SourceShape[ByteString]] {
-
- val out = Outlet[ByteString]("ChannelSource.out")
- val shape = SourceShape(out)
-
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- val channel = createChannel()
-
- object Handler extends OutHandler {
- override def onPull(): Unit = {
- try {
- val buffer = ByteBuffer.allocate(chunkSize)
- if (channel.read(buffer) > 0) {
- buffer.flip()
- push(out, ByteString.fromByteBuffer(buffer))
- } else {
- completeStage()
- }
- } catch {
- case NonFatal(_) =>
- channel.close()
- }
- }
- override def onDownstreamFinish(): Unit = {
- channel.close()
- }
- }
-
- setHandler(out, Handler)
- }
-
-}
-
-class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int)
- extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] {
-
- val in = Inlet[ByteString]("ChannelSink.in")
- val shape = SinkShape(in)
-
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
- val promise = Promise[Done]()
- val logic = new GraphStageLogic(shape) {
- val channel = createChannel()
-
- object Handler extends InHandler {
- override def onPush(): Unit = {
- try {
- val data = grab(in)
- channel.write(data.asByteBuffer)
- pull(in)
- } catch {
- case NonFatal(e) =>
- channel.close()
- promise.failure(e)
- }
- }
-
- override def onUpstreamFinish(): Unit = {
- channel.close()
- completeStage()
- promise.success(Done)
- }
-
- override def onUpstreamFailure(ex: Throwable): Unit = {
- channel.close()
- promise.failure(ex)
- }
- }
-
- setHandler(in, Handler)
-
- override def preStart(): Unit = {
- pull(in)
- }
- }
- (logic, promise.future)
- }
-
-}
-
-object ChannelStream {
-
- def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = {
- Source
- .fromGraph(new ChannelSource(channel, chunkSize))
- .withAttributes(Attributes(ActorAttributes.IODispatcher))
- .async
- }
-
- def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = {
- Sink
- .fromGraph(new ChannelSink(channel, chunkSize))
- .withAttributes(Attributes(ActorAttributes.IODispatcher))
- .async
- }
-
-}