aboutsummaryrefslogtreecommitdiff
path: root/core-storage/src/main/scala/xyz
diff options
context:
space:
mode:
Diffstat (limited to 'core-storage/src/main/scala/xyz')
-rw-r--r--core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala109
-rw-r--r--core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala50
-rw-r--r--core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala82
-rw-r--r--core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala96
-rw-r--r--core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala112
5 files changed, 449 insertions, 0 deletions
diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
new file mode 100644
index 0000000..388b8f5
--- /dev/null
+++ b/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
@@ -0,0 +1,109 @@
+package xyz.driver.core.storage
+
+import java.io.ByteArrayInputStream
+import java.net.URL
+import java.nio.file.Path
+import java.time.Clock
+import java.util.Date
+
+import akka.Done
+import akka.stream.scaladsl.{Sink, Source, StreamConverters}
+import akka.util.ByteString
+import com.aliyun.oss.OSSClient
+import com.aliyun.oss.model.ObjectPermission
+import com.typesafe.config.Config
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ExecutionContext, Future}
+
+class AliyunBlobStorage(
+ client: OSSClient,
+ bucketId: String,
+ clock: Clock,
+ chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext)
+ extends SignedBlobStorage {
+ override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
+ client.putObject(bucketId, name, new ByteArrayInputStream(content))
+ name
+ }
+
+ override def uploadFile(name: String, content: Path): Future[String] = Future {
+ client.putObject(bucketId, name, content.toFile)
+ name
+ }
+
+ override def exists(name: String): Future[Boolean] = Future {
+ client.doesObjectExist(bucketId, name)
+ }
+
+ override def list(prefix: String): Future[Set[String]] = Future {
+ client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut)
+ }
+
+ override def content(name: String): Future[Option[Array[Byte]]] = Future {
+ Option(client.getObject(bucketId, name)).map { obj =>
+ val inputStream = obj.getObjectContent
+ Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray
+ }
+ }
+
+ override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future {
+ Option(client.getObject(bucketId, name)).map { obj =>
+ StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize)
+ }
+ }
+
+ override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
+ StreamConverters
+ .asInputStream()
+ .mapMaterializedValue(is =>
+ Future {
+ client.putObject(bucketId, name, is)
+ Done
+ })
+ }
+
+ override def delete(name: String): Future[String] = Future {
+ client.deleteObject(bucketId, name)
+ name
+ }
+
+ override def url(name: String): Future[Option[URL]] = Future {
+ // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm
+ Option(client.getObjectAcl(bucketId, name)).map { acl =>
+ val isPrivate = acl.getPermission == ObjectPermission.Private
+ val bucket = client.getBucketInfo(bucketId).getBucket
+ val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint
+ new URL(s"https://$bucketId.$endpointUrl/$name")
+ }
+ }
+
+ override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
+ if (client.doesObjectExist(bucketId, name)) {
+ val expiration = new Date(clock.millis() + duration.toMillis)
+ Some(client.generatePresignedUrl(bucketId, name, expiration))
+ } else {
+ None
+ }
+ }
+}
+
+object AliyunBlobStorage {
+ val DefaultChunkSize: Int = 8192
+
+ def apply(config: Config, bucketId: String, clock: Clock)(implicit ec: ExecutionContext): AliyunBlobStorage = {
+ val clientId = config.getString("storage.aliyun.clientId")
+ val clientSecret = config.getString("storage.aliyun.clientSecret")
+ val endpoint = config.getString("storage.aliyun.endpoint")
+ this(clientId, clientSecret, endpoint, bucketId, clock)
+ }
+
+ def apply(clientId: String, clientSecret: String, region: String, bucketId: String, clock: Clock)(
+ implicit ec: ExecutionContext): AliyunBlobStorage = {
+ // https://www.alibabacloud.com/help/doc-detail/31837.htm
+ val endpoint = s"https://oss-$region.aliyuncs.com"
+ val client = new OSSClient(endpoint, clientId, clientSecret)
+ new AliyunBlobStorage(client, bucketId, clock)
+ }
+}
diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
new file mode 100644
index 0000000..0cde96a
--- /dev/null
+++ b/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
@@ -0,0 +1,50 @@
+package xyz.driver.core.storage
+
+import java.net.URL
+import java.nio.file.Path
+
+import akka.Done
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
+
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+/** Binary key-value store, typically implemented by cloud storage. */
+trait BlobStorage {
+
+ /** Upload data by value. */
+ def uploadContent(name: String, content: Array[Byte]): Future[String]
+
+ /** Upload data from an existing file. */
+ def uploadFile(name: String, content: Path): Future[String]
+
+ def exists(name: String): Future[Boolean]
+
+ /** List available keys. The prefix determines which keys should be listed
+ * and depends on the implementation (for instance, a file system backed
+ * blob store will treat a prefix as a directory path). */
+ def list(prefix: String): Future[Set[String]]
+
+ /** Get all the content of a given object. */
+ def content(name: String): Future[Option[Array[Byte]]]
+
+ /** Stream data asynchronously and with backpressure. */
+ def download(name: String): Future[Option[Source[ByteString, Any]]]
+
+ /** Get a sink to upload data. */
+ def upload(name: String): Future[Sink[ByteString, Future[Done]]]
+
+ /** Delete a stored value. */
+ def delete(name: String): Future[String]
+
+ /**
+ * Path to specified resource. Checks that the resource exists and returns None if
+ * it is not found. Depending on the implementation, may throw.
+ */
+ def url(name: String): Future[Option[URL]]
+}
+
+trait SignedBlobStorage extends BlobStorage {
+ def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]]
+}
diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala
new file mode 100644
index 0000000..e12c73d
--- /dev/null
+++ b/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala
@@ -0,0 +1,82 @@
+package xyz.driver.core.storage
+
+import java.net.URL
+import java.nio.file.{Files, Path, StandardCopyOption}
+
+import akka.stream.scaladsl.{FileIO, Sink, Source}
+import akka.util.ByteString
+import akka.{Done, NotUsed}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+/** A blob store that is backed by a local filesystem. All objects are stored relative to the given
+ * root path. Slashes ('/') in blob names are treated as usual path separators and are converted
+ * to directories. */
+class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage {
+
+ private def ensureParents(file: Path): Path = {
+ Files.createDirectories(file.getParent())
+ file
+ }
+
+ private def file(name: String) = root.resolve(name)
+
+ override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
+ Files.write(ensureParents(file(name)), content)
+ name
+ }
+ override def uploadFile(name: String, content: Path): Future[String] = Future {
+ Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING)
+ name
+ }
+
+ override def exists(name: String): Future[Boolean] = Future {
+ val path = file(name)
+ Files.exists(path) && Files.isReadable(path)
+ }
+
+ override def list(prefix: String): Future[Set[String]] = Future {
+ val dir = file(prefix)
+ Files
+ .list(dir)
+ .iterator()
+ .asScala
+ .map(p => root.relativize(p))
+ .map(_.toString)
+ .toSet
+ }
+
+ override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map {
+ case true =>
+ Some(Files.readAllBytes(file(name)))
+ case false => None
+ }
+
+ override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future {
+ if (Files.exists(file(name))) {
+ Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed))
+ } else {
+ None
+ }
+ }
+
+ override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
+ val f = ensureParents(file(name))
+ FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done))
+ }
+
+ override def delete(name: String): Future[String] = exists(name).map { e =>
+ if (e) {
+ Files.delete(file(name))
+ }
+ name
+ }
+
+ override def url(name: String): Future[Option[URL]] = exists(name) map {
+ case true =>
+ Some(root.resolve(name).toUri.toURL)
+ case false =>
+ None
+ }
+}
diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
new file mode 100644
index 0000000..95164c7
--- /dev/null
+++ b/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
@@ -0,0 +1,96 @@
+package xyz.driver.core.storage
+
+import java.io.{FileInputStream, InputStream}
+import java.net.URL
+import java.nio.file.Path
+
+import akka.Done
+import akka.stream.scaladsl.Sink
+import akka.util.ByteString
+import com.google.api.gax.paging.Page
+import com.google.auth.oauth2.ServiceAccountCredentials
+import com.google.cloud.storage.Storage.BlobListOption
+import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ExecutionContext, Future}
+
+class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)(
+ implicit ec: ExecutionContext)
+ extends BlobStorage with SignedBlobStorage {
+
+ private val bucket: Bucket = client.get(bucketId)
+ require(bucket != null, s"Bucket $bucketId does not exist.")
+
+ override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
+ bucket.create(name, content).getBlobId.getName
+ }
+
+ override def uploadFile(name: String, content: Path): Future[String] = Future {
+ bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName
+ }
+
+ override def exists(name: String): Future[Boolean] = Future {
+ bucket.get(name) != null
+ }
+
+ override def list(prefix: String): Future[Set[String]] = Future {
+ val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix))
+ page
+ .iterateAll()
+ .asScala
+ .map(_.getName())
+ .toSet
+ }
+
+ override def content(name: String): Future[Option[Array[Byte]]] = Future {
+ Option(bucket.get(name)).map(blob => blob.getContent())
+ }
+
+ override def download(name: String) = Future {
+ Option(bucket.get(name)).map { blob =>
+ ChannelStream.fromChannel(() => blob.reader(), chunkSize)
+ }
+ }
+
+ override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
+ val blob = bucket.create(name, Array.emptyByteArray)
+ ChannelStream.toChannel(() => blob.writer(), chunkSize)
+ }
+
+ override def delete(name: String): Future[String] = Future {
+ client.delete(BlobId.of(bucketId, name))
+ name
+ }
+
+ override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
+ Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit))
+ }
+
+ override def url(name: String): Future[Option[URL]] = Future {
+ val protocol: String = "https"
+ val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/"
+ Option(bucket.get(name)).map { blob =>
+ new URL(protocol, resourcePath, blob.getName)
+ }
+ }
+}
+
+object GcsBlobStorage {
+ final val DefaultChunkSize = 8192
+
+ private def newClient(key: InputStream): Storage =
+ StorageOptions
+ .newBuilder()
+ .setCredentials(ServiceAccountCredentials.fromStream(key))
+ .build()
+ .getService()
+
+ def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)(
+ implicit ec: ExecutionContext): GcsBlobStorage = {
+ val client = newClient(new FileInputStream(keyfile.toFile))
+ new GcsBlobStorage(client, bucketId, chunkSize)
+ }
+
+}
diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala
new file mode 100644
index 0000000..fc652be
--- /dev/null
+++ b/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala
@@ -0,0 +1,112 @@
+package xyz.driver.core.storage
+
+import java.nio.ByteBuffer
+import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
+
+import akka.stream._
+import akka.stream.scaladsl.{Sink, Source}
+import akka.stream.stage._
+import akka.util.ByteString
+import akka.{Done, NotUsed}
+
+import scala.concurrent.{Future, Promise}
+import scala.util.control.NonFatal
+
+class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int)
+ extends GraphStage[SourceShape[ByteString]] {
+
+ val out = Outlet[ByteString]("ChannelSource.out")
+ val shape = SourceShape(out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
+ val channel = createChannel()
+
+ object Handler extends OutHandler {
+ override def onPull(): Unit = {
+ try {
+ val buffer = ByteBuffer.allocate(chunkSize)
+ if (channel.read(buffer) > 0) {
+ buffer.flip()
+ push(out, ByteString.fromByteBuffer(buffer))
+ } else {
+ completeStage()
+ }
+ } catch {
+ case NonFatal(_) =>
+ channel.close()
+ }
+ }
+ override def onDownstreamFinish(): Unit = {
+ channel.close()
+ }
+ }
+
+ setHandler(out, Handler)
+ }
+
+}
+
+class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int)
+ extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] {
+
+ val in = Inlet[ByteString]("ChannelSink.in")
+ val shape = SinkShape(in)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+ val promise = Promise[Done]()
+ val logic = new GraphStageLogic(shape) {
+ val channel = createChannel()
+
+ object Handler extends InHandler {
+ override def onPush(): Unit = {
+ try {
+ val data = grab(in)
+ channel.write(data.asByteBuffer)
+ pull(in)
+ } catch {
+ case NonFatal(e) =>
+ channel.close()
+ promise.failure(e)
+ }
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ channel.close()
+ completeStage()
+ promise.success(Done)
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ channel.close()
+ promise.failure(ex)
+ }
+ }
+
+ setHandler(in, Handler)
+
+ override def preStart(): Unit = {
+ pull(in)
+ }
+ }
+ (logic, promise.future)
+ }
+
+}
+
+object ChannelStream {
+
+ def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = {
+ Source
+ .fromGraph(new ChannelSource(channel, chunkSize))
+ .withAttributes(Attributes(ActorAttributes.IODispatcher))
+ .async
+ }
+
+ def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = {
+ Sink
+ .fromGraph(new ChannelSink(channel, chunkSize))
+ .withAttributes(Attributes(ActorAttributes.IODispatcher))
+ .async
+ }
+
+}