From 5a48dbf20422537e123e7e0cfddd57570abc3fa4 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Wed, 1 Aug 2018 13:05:56 -0700 Subject: Add Aliyun OSS blob storage support (#187) * Add Aliyun OSS blob storage support * Fix bugs, add convencience constructors, add signed URL support --- build.sbt | 1 + .../driver/core/storage/AliyunBlobStorage.scala | 108 +++++++++++++++++++++ .../xyz/driver/core/storage/BlobStorage.scala | 4 +- 3 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala diff --git a/build.sbt b/build.sbt index e8d9ba7..ca695fb 100644 --- a/build.sbt +++ b/build.sbt @@ -33,6 +33,7 @@ lazy val core = (project in file(".")) "com.amazonaws" % "aws-java-sdk-s3" % "1.11.342", "com.google.cloud" % "google-cloud-pubsub" % "1.31.0", "com.google.cloud" % "google-cloud-storage" % "1.31.0", + "com.aliyun.oss" % "aliyun-sdk-oss" % "2.8.2", "com.typesafe" % "config" % "1.3.3", "ch.qos.logback" % "logback-classic" % "1.2.3", "ch.qos.logback.contrib" % "logback-json-classic" % "0.1.5", diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala new file mode 100644 index 0000000..b5e8678 --- /dev/null +++ b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala @@ -0,0 +1,108 @@ +package xyz.driver.core.storage + +import java.io.ByteArrayInputStream +import java.net.URL +import java.nio.file.Path +import java.util.Date + +import akka.Done +import akka.stream.scaladsl.{Sink, Source, StreamConverters} +import akka.util.ByteString +import com.aliyun.oss.OSSClient +import com.aliyun.oss.model.ObjectPermission +import com.typesafe.config.Config +import xyz.driver.core.time.provider.TimeProvider + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} + +class AliyunBlobStorage( + client: OSSClient, + bucketId: String, + timeProvider: TimeProvider, + chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) + extends SignedBlobStorage { + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + client.putObject(bucketId, name, new ByteArrayInputStream(content)) + name + } + + override def uploadFile(name: String, content: Path): Future[String] = Future { + client.putObject(bucketId, name, content.toFile) + name + } + + override def exists(name: String): Future[Boolean] = Future { + client.doesObjectExist(bucketId, name) + } + + override def list(prefix: String): Future[Set[String]] = Future { + client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) + } + + override def content(name: String): Future[Option[Array[Byte]]] = Future { + Option(client.getObject(bucketId, name)).map { obj => + val inputStream = obj.getObjectContent + Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray + } + } + + override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { + Option(client.getObject(bucketId, name)).map { obj => + StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + StreamConverters + .asInputStream() + .mapMaterializedValue(is => + Future { + client.putObject(bucketId, name, is) + Done + }) + } + + override def delete(name: String): Future[String] = Future { + client.deleteObject(bucketId, name) + name + } + + override def url(name: String): Future[Option[URL]] = Future { + // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm + Option(client.getObjectAcl(bucketId, name)).map { acl => + val isPrivate = acl.getPermission == ObjectPermission.Private + val bucket = client.getBucketInfo(bucketId).getBucket + val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint + new URL(s"https://$bucketId.$endpointUrl/$name") + } + } + + override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { + if (client.doesObjectExist(bucketId, name)) { + val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis) + Some(client.generatePresignedUrl(bucketId, name, expiration)) + } else { + None + } + } +} + +object AliyunBlobStorage { + val DefaultChunkSize: Int = 8192 + + def apply(config: Config, bucketId: String, timeProvider: TimeProvider)( + implicit ec: ExecutionContext): AliyunBlobStorage = { + val clientId = config.getString("storage.aliyun.clientId") + val clientSecret = config.getString("storage.aliyun.clientSecret") + val endpoint = config.getString("storage.aliyun.endpoint") + this(clientId, clientSecret, endpoint, bucketId, timeProvider) + } + + def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)( + implicit ec: ExecutionContext): AliyunBlobStorage = { + val client = new OSSClient(endpoint, clientId, clientSecret) + new AliyunBlobStorage(client, bucketId, timeProvider) + } +} diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala index ee6c5d7..0cde96a 100644 --- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala +++ b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala @@ -3,9 +3,9 @@ package xyz.driver.core.storage import java.net.URL import java.nio.file.Path +import akka.Done import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString -import akka.{Done, NotUsed} import scala.concurrent.Future import scala.concurrent.duration.Duration @@ -30,7 +30,7 @@ trait BlobStorage { def content(name: String): Future[Option[Array[Byte]]] /** Stream data asynchronously and with backpressure. */ - def download(name: String): Future[Option[Source[ByteString, NotUsed]]] + def download(name: String): Future[Option[Source[ByteString, Any]]] /** Get a sink to upload data. */ def upload(name: String): Future[Sink[ByteString, Future[Done]]] -- cgit v1.2.3