aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2018-08-01 13:05:56 -0700
committerGitHub <noreply@github.com>2018-08-01 13:05:56 -0700
commit5a48dbf20422537e123e7e0cfddd57570abc3fa4 (patch)
treed060c2a2fafb5eb093c4651a86f7f28fc1186b88
parente6552f3b31b55396c652c196c5c3a9c3a6cfed71 (diff)
downloaddriver-core-5a48dbf20422537e123e7e0cfddd57570abc3fa4.tar.gz
driver-core-5a48dbf20422537e123e7e0cfddd57570abc3fa4.tar.bz2
driver-core-5a48dbf20422537e123e7e0cfddd57570abc3fa4.zip
Add Aliyun OSS blob storage support (#187)
* Add Aliyun OSS blob storage support * Fix bugs, add convencience constructors, add signed URL support
-rw-r--r--build.sbt1
-rw-r--r--src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala108
-rw-r--r--src/main/scala/xyz/driver/core/storage/BlobStorage.scala4
3 files changed, 111 insertions, 2 deletions
diff --git a/build.sbt b/build.sbt
index e8d9ba7..ca695fb 100644
--- a/build.sbt
+++ b/build.sbt
@@ -33,6 +33,7 @@ lazy val core = (project in file("."))
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.342",
"com.google.cloud" % "google-cloud-pubsub" % "1.31.0",
"com.google.cloud" % "google-cloud-storage" % "1.31.0",
+ "com.aliyun.oss" % "aliyun-sdk-oss" % "2.8.2",
"com.typesafe" % "config" % "1.3.3",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"ch.qos.logback.contrib" % "logback-json-classic" % "0.1.5",
diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
new file mode 100644
index 0000000..b5e8678
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala
@@ -0,0 +1,108 @@
+package xyz.driver.core.storage
+
+import java.io.ByteArrayInputStream
+import java.net.URL
+import java.nio.file.Path
+import java.util.Date
+
+import akka.Done
+import akka.stream.scaladsl.{Sink, Source, StreamConverters}
+import akka.util.ByteString
+import com.aliyun.oss.OSSClient
+import com.aliyun.oss.model.ObjectPermission
+import com.typesafe.config.Config
+import xyz.driver.core.time.provider.TimeProvider
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ExecutionContext, Future}
+
+class AliyunBlobStorage(
+ client: OSSClient,
+ bucketId: String,
+ timeProvider: TimeProvider,
+ chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext)
+ extends SignedBlobStorage {
+ override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
+ client.putObject(bucketId, name, new ByteArrayInputStream(content))
+ name
+ }
+
+ override def uploadFile(name: String, content: Path): Future[String] = Future {
+ client.putObject(bucketId, name, content.toFile)
+ name
+ }
+
+ override def exists(name: String): Future[Boolean] = Future {
+ client.doesObjectExist(bucketId, name)
+ }
+
+ override def list(prefix: String): Future[Set[String]] = Future {
+ client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut)
+ }
+
+ override def content(name: String): Future[Option[Array[Byte]]] = Future {
+ Option(client.getObject(bucketId, name)).map { obj =>
+ val inputStream = obj.getObjectContent
+ Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray
+ }
+ }
+
+ override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future {
+ Option(client.getObject(bucketId, name)).map { obj =>
+ StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize)
+ }
+ }
+
+ override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
+ StreamConverters
+ .asInputStream()
+ .mapMaterializedValue(is =>
+ Future {
+ client.putObject(bucketId, name, is)
+ Done
+ })
+ }
+
+ override def delete(name: String): Future[String] = Future {
+ client.deleteObject(bucketId, name)
+ name
+ }
+
+ override def url(name: String): Future[Option[URL]] = Future {
+ // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm
+ Option(client.getObjectAcl(bucketId, name)).map { acl =>
+ val isPrivate = acl.getPermission == ObjectPermission.Private
+ val bucket = client.getBucketInfo(bucketId).getBucket
+ val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint
+ new URL(s"https://$bucketId.$endpointUrl/$name")
+ }
+ }
+
+ override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
+ if (client.doesObjectExist(bucketId, name)) {
+ val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis)
+ Some(client.generatePresignedUrl(bucketId, name, expiration))
+ } else {
+ None
+ }
+ }
+}
+
+object AliyunBlobStorage {
+ val DefaultChunkSize: Int = 8192
+
+ def apply(config: Config, bucketId: String, timeProvider: TimeProvider)(
+ implicit ec: ExecutionContext): AliyunBlobStorage = {
+ val clientId = config.getString("storage.aliyun.clientId")
+ val clientSecret = config.getString("storage.aliyun.clientSecret")
+ val endpoint = config.getString("storage.aliyun.endpoint")
+ this(clientId, clientSecret, endpoint, bucketId, timeProvider)
+ }
+
+ def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)(
+ implicit ec: ExecutionContext): AliyunBlobStorage = {
+ val client = new OSSClient(endpoint, clientId, clientSecret)
+ new AliyunBlobStorage(client, bucketId, timeProvider)
+ }
+}
diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
index ee6c5d7..0cde96a 100644
--- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
+++ b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala
@@ -3,9 +3,9 @@ package xyz.driver.core.storage
import java.net.URL
import java.nio.file.Path
+import akka.Done
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
-import akka.{Done, NotUsed}
import scala.concurrent.Future
import scala.concurrent.duration.Duration
@@ -30,7 +30,7 @@ trait BlobStorage {
def content(name: String): Future[Option[Array[Byte]]]
/** Stream data asynchronously and with backpressure. */
- def download(name: String): Future[Option[Source[ByteString, NotUsed]]]
+ def download(name: String): Future[Option[Source[ByteString, Any]]]
/** Get a sink to upload data. */
def upload(name: String): Future[Sink[ByteString, Future[Done]]]