From 7c755c77afbd67ae2ded9d8b004736d4e27e208f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 16:18:26 -0700 Subject: Move storage and messaging to separate projects --- .../driver/core/storage/AliyunBlobStorage.scala | 108 ++++++++++++++++++++ .../xyz/driver/core/storage/BlobStorage.scala | 50 +++++++++ .../core/storage/FileSystemBlobStorage.scala | 82 +++++++++++++++ .../xyz/driver/core/storage/GcsBlobStorage.scala | 96 ++++++++++++++++++ .../xyz/driver/core/storage/channelStreams.scala | 112 +++++++++++++++++++++ .../scala/xyz/driver/core/BlobStorageTest.scala | 94 +++++++++++++++++ 6 files changed, 542 insertions(+) create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala create mode 100644 core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala create mode 100644 core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala (limited to 'core-storage') diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala new file mode 100644 index 0000000..7e59df4 --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala @@ -0,0 +1,108 @@ +package xyz.driver.core.storage + +import java.io.ByteArrayInputStream +import java.net.URL +import java.nio.file.Path +import java.time.Clock +import java.util.Date + +import akka.Done +import akka.stream.scaladsl.{Sink, Source, StreamConverters} +import akka.util.ByteString +import com.aliyun.oss.OSSClient +import com.aliyun.oss.model.ObjectPermission +import com.typesafe.config.Config + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} + +class AliyunBlobStorage( + client: OSSClient, + bucketId: String, + clock: Clock, + chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) + extends SignedBlobStorage { + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + client.putObject(bucketId, name, new ByteArrayInputStream(content)) + name + } + + override def uploadFile(name: String, content: Path): Future[String] = Future { + client.putObject(bucketId, name, content.toFile) + name + } + + override def exists(name: String): Future[Boolean] = Future { + client.doesObjectExist(bucketId, name) + } + + override def list(prefix: String): Future[Set[String]] = Future { + client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) + } + + override def content(name: String): Future[Option[Array[Byte]]] = Future { + Option(client.getObject(bucketId, name)).map { obj => + val inputStream = obj.getObjectContent + Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray + } + } + + override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { + Option(client.getObject(bucketId, name)).map { obj => + StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + StreamConverters + .asInputStream() + .mapMaterializedValue(is => + Future { + client.putObject(bucketId, name, is) + Done + }) + } + + override def delete(name: String): Future[String] = Future { + client.deleteObject(bucketId, name) + name + } + + override def url(name: String): Future[Option[URL]] = Future { + // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm + Option(client.getObjectAcl(bucketId, name)).map { acl => + val isPrivate = acl.getPermission == ObjectPermission.Private + val bucket = client.getBucketInfo(bucketId).getBucket + val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint + new URL(s"https://$bucketId.$endpointUrl/$name") + } + } + + override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { + if (client.doesObjectExist(bucketId, name)) { + val expiration = new Date(clock.millis() + duration.toMillis) + Some(client.generatePresignedUrl(bucketId, name, expiration)) + } else { + None + } + } +} + +object AliyunBlobStorage { + val DefaultChunkSize: Int = 8192 + + def apply(config: Config, bucketId: String, clock: Clock)( + implicit ec: ExecutionContext): AliyunBlobStorage = { + val clientId = config.getString("storage.aliyun.clientId") + val clientSecret = config.getString("storage.aliyun.clientSecret") + val endpoint = config.getString("storage.aliyun.endpoint") + this(clientId, clientSecret, endpoint, bucketId, clock) + } + + def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, clock: Clock)( + implicit ec: ExecutionContext): AliyunBlobStorage = { + val client = new OSSClient(endpoint, clientId, clientSecret) + new AliyunBlobStorage(client, bucketId, clock) + } +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala new file mode 100644 index 0000000..0cde96a --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/BlobStorage.scala @@ -0,0 +1,50 @@ +package xyz.driver.core.storage + +import java.net.URL +import java.nio.file.Path + +import akka.Done +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString + +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +/** Binary key-value store, typically implemented by cloud storage. */ +trait BlobStorage { + + /** Upload data by value. */ + def uploadContent(name: String, content: Array[Byte]): Future[String] + + /** Upload data from an existing file. */ + def uploadFile(name: String, content: Path): Future[String] + + def exists(name: String): Future[Boolean] + + /** List available keys. The prefix determines which keys should be listed + * and depends on the implementation (for instance, a file system backed + * blob store will treat a prefix as a directory path). */ + def list(prefix: String): Future[Set[String]] + + /** Get all the content of a given object. */ + def content(name: String): Future[Option[Array[Byte]]] + + /** Stream data asynchronously and with backpressure. */ + def download(name: String): Future[Option[Source[ByteString, Any]]] + + /** Get a sink to upload data. */ + def upload(name: String): Future[Sink[ByteString, Future[Done]]] + + /** Delete a stored value. */ + def delete(name: String): Future[String] + + /** + * Path to specified resource. Checks that the resource exists and returns None if + * it is not found. Depending on the implementation, may throw. + */ + def url(name: String): Future[Option[URL]] +} + +trait SignedBlobStorage extends BlobStorage { + def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala new file mode 100644 index 0000000..e12c73d --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala @@ -0,0 +1,82 @@ +package xyz.driver.core.storage + +import java.net.URL +import java.nio.file.{Files, Path, StandardCopyOption} + +import akka.stream.scaladsl.{FileIO, Sink, Source} +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +/** A blob store that is backed by a local filesystem. All objects are stored relative to the given + * root path. Slashes ('/') in blob names are treated as usual path separators and are converted + * to directories. */ +class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { + + private def ensureParents(file: Path): Path = { + Files.createDirectories(file.getParent()) + file + } + + private def file(name: String) = root.resolve(name) + + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + Files.write(ensureParents(file(name)), content) + name + } + override def uploadFile(name: String, content: Path): Future[String] = Future { + Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) + name + } + + override def exists(name: String): Future[Boolean] = Future { + val path = file(name) + Files.exists(path) && Files.isReadable(path) + } + + override def list(prefix: String): Future[Set[String]] = Future { + val dir = file(prefix) + Files + .list(dir) + .iterator() + .asScala + .map(p => root.relativize(p)) + .map(_.toString) + .toSet + } + + override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { + case true => + Some(Files.readAllBytes(file(name))) + case false => None + } + + override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { + if (Files.exists(file(name))) { + Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) + } else { + None + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + val f = ensureParents(file(name)) + FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) + } + + override def delete(name: String): Future[String] = exists(name).map { e => + if (e) { + Files.delete(file(name)) + } + name + } + + override def url(name: String): Future[Option[URL]] = exists(name) map { + case true => + Some(root.resolve(name).toUri.toURL) + case false => + None + } +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala new file mode 100644 index 0000000..95164c7 --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala @@ -0,0 +1,96 @@ +package xyz.driver.core.storage + +import java.io.{FileInputStream, InputStream} +import java.net.URL +import java.nio.file.Path + +import akka.Done +import akka.stream.scaladsl.Sink +import akka.util.ByteString +import com.google.api.gax.paging.Page +import com.google.auth.oauth2.ServiceAccountCredentials +import com.google.cloud.storage.Storage.BlobListOption +import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} + +class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( + implicit ec: ExecutionContext) + extends BlobStorage with SignedBlobStorage { + + private val bucket: Bucket = client.get(bucketId) + require(bucket != null, s"Bucket $bucketId does not exist.") + + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + bucket.create(name, content).getBlobId.getName + } + + override def uploadFile(name: String, content: Path): Future[String] = Future { + bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName + } + + override def exists(name: String): Future[Boolean] = Future { + bucket.get(name) != null + } + + override def list(prefix: String): Future[Set[String]] = Future { + val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) + page + .iterateAll() + .asScala + .map(_.getName()) + .toSet + } + + override def content(name: String): Future[Option[Array[Byte]]] = Future { + Option(bucket.get(name)).map(blob => blob.getContent()) + } + + override def download(name: String) = Future { + Option(bucket.get(name)).map { blob => + ChannelStream.fromChannel(() => blob.reader(), chunkSize) + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + val blob = bucket.create(name, Array.emptyByteArray) + ChannelStream.toChannel(() => blob.writer(), chunkSize) + } + + override def delete(name: String): Future[String] = Future { + client.delete(BlobId.of(bucketId, name)) + name + } + + override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { + Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) + } + + override def url(name: String): Future[Option[URL]] = Future { + val protocol: String = "https" + val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/" + Option(bucket.get(name)).map { blob => + new URL(protocol, resourcePath, blob.getName) + } + } +} + +object GcsBlobStorage { + final val DefaultChunkSize = 8192 + + private def newClient(key: InputStream): Storage = + StorageOptions + .newBuilder() + .setCredentials(ServiceAccountCredentials.fromStream(key)) + .build() + .getService() + + def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( + implicit ec: ExecutionContext): GcsBlobStorage = { + val client = newClient(new FileInputStream(keyfile.toFile)) + new GcsBlobStorage(client, bucketId, chunkSize) + } + +} diff --git a/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala new file mode 100644 index 0000000..fc652be --- /dev/null +++ b/core-storage/src/main/scala/xyz/driver/core/storage/channelStreams.scala @@ -0,0 +1,112 @@ +package xyz.driver.core.storage + +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, WritableByteChannel} + +import akka.stream._ +import akka.stream.scaladsl.{Sink, Source} +import akka.stream.stage._ +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.concurrent.{Future, Promise} +import scala.util.control.NonFatal + +class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) + extends GraphStage[SourceShape[ByteString]] { + + val out = Outlet[ByteString]("ChannelSource.out") + val shape = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + val channel = createChannel() + + object Handler extends OutHandler { + override def onPull(): Unit = { + try { + val buffer = ByteBuffer.allocate(chunkSize) + if (channel.read(buffer) > 0) { + buffer.flip() + push(out, ByteString.fromByteBuffer(buffer)) + } else { + completeStage() + } + } catch { + case NonFatal(_) => + channel.close() + } + } + override def onDownstreamFinish(): Unit = { + channel.close() + } + } + + setHandler(out, Handler) + } + +} + +class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) + extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { + + val in = Inlet[ByteString]("ChannelSink.in") + val shape = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogic(shape) { + val channel = createChannel() + + object Handler extends InHandler { + override def onPush(): Unit = { + try { + val data = grab(in) + channel.write(data.asByteBuffer) + pull(in) + } catch { + case NonFatal(e) => + channel.close() + promise.failure(e) + } + } + + override def onUpstreamFinish(): Unit = { + channel.close() + completeStage() + promise.success(Done) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + channel.close() + promise.failure(ex) + } + } + + setHandler(in, Handler) + + override def preStart(): Unit = { + pull(in) + } + } + (logic, promise.future) + } + +} + +object ChannelStream { + + def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { + Source + .fromGraph(new ChannelSource(channel, chunkSize)) + .withAttributes(Attributes(ActorAttributes.IODispatcher)) + .async + } + + def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { + Sink + .fromGraph(new ChannelSink(channel, chunkSize)) + .withAttributes(Attributes(ActorAttributes.IODispatcher)) + .async + } + +} diff --git a/core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala b/core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala new file mode 100644 index 0000000..811cc60 --- /dev/null +++ b/core-storage/src/test/scala/xyz/driver/core/BlobStorageTest.scala @@ -0,0 +1,94 @@ +package xyz.driver.core + +import java.nio.file.Files + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.scalatest._ +import org.scalatest.concurrent.ScalaFutures +import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage} + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +class BlobStorageTest extends FlatSpec with ScalaFutures { + + implicit val patientce = PatienceConfig(timeout = 100.seconds) + + implicit val system = ActorSystem("blobstorage-test") + implicit val mat = ActorMaterializer() + import system.dispatcher + + def storageBehaviour(storage: BlobStorage) = { + val key = "foo" + val data = "hello world".getBytes + it should "upload data" in { + assert(storage.exists(key).futureValue === false) + assert(storage.uploadContent(key, data).futureValue === key) + assert(storage.exists(key).futureValue === true) + } + it should "download data" in { + val content = storage.content(key).futureValue + assert(content.isDefined) + assert(content.get === data) + } + it should "not download non-existing data" in { + assert(storage.content("bar").futureValue.isEmpty) + } + it should "overwrite an existing key" in { + val newData = "new string".getBytes("utf-8") + assert(storage.uploadContent(key, newData).futureValue === key) + assert(storage.content(key).futureValue.get === newData) + } + it should "upload a file" in { + val tmp = Files.createTempFile("testfile", "txt") + Files.write(tmp, data) + assert(storage.uploadFile(key, tmp).futureValue === key) + Files.delete(tmp) + } + it should "upload content" in { + val text = "foobar" + val src = Source + .single(text) + .map(l => ByteString(l)) + src.runWith(storage.upload(key).futureValue).futureValue + assert(storage.content(key).futureValue.map(_.toSeq) === Some("foobar".getBytes.toSeq)) + } + it should "delete content" in { + assert(storage.exists(key).futureValue) + storage.delete(key).futureValue + assert(!storage.exists(key).futureValue) + } + it should "download content" in { + storage.uploadContent(key, data) futureValue + val srcOpt = storage.download(key).futureValue + assert(srcOpt.isDefined) + val src = srcOpt.get + val content: Future[Array[Byte]] = src.runWith(Sink.fold(Array[Byte]())(_ ++ _)) + assert(content.futureValue === data) + } + it should "list keys" in { + assert(storage.list("").futureValue === Set(key)) + storage.uploadContent("a/a.txt", data).futureValue + storage.uploadContent("a/b", data).futureValue + storage.uploadContent("c/d", data).futureValue + storage.uploadContent("d", data).futureValue + assert(storage.list("").futureValue === Set(key, "a", "c", "d")) + assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) + assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) + assert(storage.list("c").futureValue === Set("c/d")) + } + it should "get valid URL" in { + assert(storage.exists(key).futureValue === true) + val fooUrl = storage.url(key).futureValue + assert(fooUrl.isDefined) + } + } + + "File system storage" should behave like storageBehaviour( + new FileSystemBlobStorage(Files.createTempDirectory("test"))) + +} -- cgit v1.2.3