From cc86f8d609969b40793a227b9af4b41a18657dfb Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Thu, 22 Mar 2018 15:47:42 -0700 Subject: Add blob storage abstractions --- build.sbt | 2 +- .../xyz/driver/core/file/FileSystemStorage.scala | 1 + .../scala/xyz/driver/core/file/GcsStorage.scala | 1 + .../scala/xyz/driver/core/file/S3Storage.scala | 5 + .../xyz/driver/core/storage/BlobStorage.scala | 45 +++++++++ .../core/storage/FileSystemBlobStorage.scala | 75 ++++++++++++++ .../xyz/driver/core/storage/GcsBlobStorage.scala | 89 ++++++++++++++++ .../xyz/driver/core/storage/channelStreams.scala | 112 +++++++++++++++++++++ .../scala/xyz/driver/core/BlobStorageTest.scala | 88 ++++++++++++++++ 9 files changed, 417 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/xyz/driver/core/storage/BlobStorage.scala create mode 100644 src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala create mode 100644 src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala create mode 100644 src/main/scala/xyz/driver/core/storage/channelStreams.scala create mode 100644 src/test/scala/xyz/driver/core/BlobStorageTest.scala diff --git a/build.sbt b/build.sbt index 1249336..9f1a562 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ lazy val core = (project in file(".")) "org.mockito" % "mockito-core" % "1.9.5" % Test, "com.amazonaws" % "aws-java-sdk-s3" % "1.11.26", "com.google.cloud" % "google-cloud-pubsub" % "0.25.0-beta", - "com.google.cloud" % "google-cloud-storage" % "1.7.0", + "com.google.cloud" % "google-cloud-storage" % "1.24.1", "com.typesafe" % "config" % "1.3.1", "ch.qos.logback" % "logback-classic" % "1.1.11", "com.googlecode.libphonenumber" % "libphonenumber" % "8.9.2" diff --git a/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala b/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala index 5a0df39..ce26fe4 100644 --- a/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala +++ b/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala @@ -12,6 +12,7 @@ import xyz.driver.core.time.Time import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} +@deprecated("Consider using xyz.driver.core.storage.FileSystemBlobStorage instead", "driver-core 1.8.14") class FileSystemStorage(executionContext: ExecutionContext) extends FileStorage { implicit private val execution = executionContext diff --git a/src/main/scala/xyz/driver/core/file/GcsStorage.scala b/src/main/scala/xyz/driver/core/file/GcsStorage.scala index 0970092..5c94645 100644 --- a/src/main/scala/xyz/driver/core/file/GcsStorage.scala +++ b/src/main/scala/xyz/driver/core/file/GcsStorage.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} +@deprecated("Consider using xyz.driver.core.storage.GcsBlobStorage instead", "driver-core 1.8.14") class GcsStorage( storageClient: Storage, bucketName: Name[Bucket], diff --git a/src/main/scala/xyz/driver/core/file/S3Storage.scala b/src/main/scala/xyz/driver/core/file/S3Storage.scala index 311aab3..5158d4d 100644 --- a/src/main/scala/xyz/driver/core/file/S3Storage.scala +++ b/src/main/scala/xyz/driver/core/file/S3Storage.scala @@ -15,6 +15,11 @@ import xyz.driver.core.time.Time import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} +@deprecated( + "Blob storage functionality has been reimplemented in xyz.driver.core.storage.BlobStorage. " + + "It has not been ported to S3 storage. Please raise an issue if this required for your use-case.", + "driver-core 1.8.14" +) class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionContext, chunkSize: Int = 4096) extends FileStorage { implicit private val execution = executionContext diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala new file mode 100644 index 0000000..b12230e --- /dev/null +++ b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala @@ -0,0 +1,45 @@ +package xyz.driver.core.storage + +import java.net.URL +import java.nio.file.Path + +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +/** Binary key-value store, typically implemented by cloud storage. */ +trait BlobStorage { + + /** Upload data by value. */ + def uploadContent(name: String, content: Array[Byte]): Future[String] + + /** Upload data from an existing file. */ + def uploadFile(name: String, content: Path): Future[String] + + def exists(name: String): Future[Boolean] + + /** List available keys. The prefix determines which keys should be listed + * and depends on the implementation (for instance, a file system backed + * blob store will treat a prefix as a directory path). */ + def list(prefix: String): Future[Set[String]] + + /** Get all the content of a given object. */ + def content(name: String): Future[Option[Array[Byte]]] + + /** Stream data asynchronously and with backpressure. */ + def download(name: String): Future[Option[Source[ByteString, NotUsed]]] + + /** Get a sink to upload data. */ + def upload(name: String): Future[Sink[ByteString, Future[Done]]] + + /** Delete a stored value. */ + def delete(name: String): Future[String] + +} + +trait SignedBlobStorage extends BlobStorage { + def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] +} diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala new file mode 100644 index 0000000..80076b6 --- /dev/null +++ b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala @@ -0,0 +1,75 @@ +package xyz.driver.core.storage + +import java.nio.file.{Files, Path, StandardCopyOption} + +import akka.stream.scaladsl.{FileIO, Sink, Source} +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +/** A blob store that is backed by a local filesystem. All objects are stored relative to the given + * root path. Slashes ('/') in blob names are treated as usual path separators and are converted + * to directories. */ +class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { + + private def ensureParents(file: Path): Path = { + Files.createDirectories(file.getParent()) + file + } + + private def file(name: String) = root.resolve(name) + + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + Files.write(ensureParents(file(name)), content) + name + } + override def uploadFile(name: String, content: Path): Future[String] = Future { + Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) + name + } + + override def exists(name: String): Future[Boolean] = Future { + val path = file(name) + Files.exists(path) && Files.isReadable(path) + } + + override def list(prefix: String): Future[Set[String]] = Future { + val dir = file(prefix) + Files + .list(dir) + .iterator() + .asScala + .map(p => root.relativize(p)) + .map(_.toString) + .toSet + } + + override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { + case true => + Some(Files.readAllBytes(file(name))) + case false => None + } + + override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { + if (Files.exists(file(name))) { + Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) + } else { + None + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + val f = ensureParents(file(name)) + FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) + } + + override def delete(name: String): Future[String] = exists(name).map { e => + if (e) { + Files.delete(file(name)) + } + name + } + +} diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala new file mode 100644 index 0000000..c176d12 --- /dev/null +++ b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala @@ -0,0 +1,89 @@ +package xyz.driver.core.storage + +import java.io.{FileInputStream, InputStream} +import java.net.URL +import java.nio.file.Path + +import akka.Done +import akka.stream.scaladsl.Sink +import akka.util.ByteString +import com.google.api.gax.paging.Page +import com.google.auth.oauth2.ServiceAccountCredentials +import com.google.cloud.storage.Storage.BlobListOption +import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} + +class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( + implicit ec: ExecutionContext) + extends BlobStorage with SignedBlobStorage { + + private val bucket: Bucket = client.get(bucketId) + require(bucket != null, s"Bucket $bucketId does not exist.") + + override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { + bucket.create(name, content).getBlobId.getName + } + + override def uploadFile(name: String, content: Path): Future[String] = Future { + bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName + } + + override def exists(name: String): Future[Boolean] = Future { + bucket.get(name) != null + } + + override def list(prefix: String): Future[Set[String]] = Future { + val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) + page + .iterateAll() + .asScala + .map(_.getName()) + .toSet + } + + override def content(name: String): Future[Option[Array[Byte]]] = Future { + Option(bucket.get(name)).map(blob => blob.getContent()) + } + + override def download(name: String) = Future { + Option(bucket.get(name)).map { blob => + ChannelStream.fromChannel(() => blob.reader(), chunkSize) + } + } + + override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { + val blob = bucket.create(name, Array.emptyByteArray) + ChannelStream.toChannel(() => blob.writer(), chunkSize) + } + + override def delete(name: String): Future[String] = Future { + client.delete(BlobId.of(bucketId, name)) + name + } + + override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { + Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) + } + +} + +object GcsBlobStorage { + final val DefaultChunkSize = 8192 + + private def newClient(key: InputStream): Storage = + StorageOptions + .newBuilder() + .setCredentials(ServiceAccountCredentials.fromStream(key)) + .build() + .getService() + + def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( + implicit ec: ExecutionContext): GcsBlobStorage = { + val client = newClient(new FileInputStream(keyfile.toFile)) + new GcsBlobStorage(client, bucketId, chunkSize) + } + +} diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala new file mode 100644 index 0000000..fc652be --- /dev/null +++ b/src/main/scala/xyz/driver/core/storage/channelStreams.scala @@ -0,0 +1,112 @@ +package xyz.driver.core.storage + +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, WritableByteChannel} + +import akka.stream._ +import akka.stream.scaladsl.{Sink, Source} +import akka.stream.stage._ +import akka.util.ByteString +import akka.{Done, NotUsed} + +import scala.concurrent.{Future, Promise} +import scala.util.control.NonFatal + +class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) + extends GraphStage[SourceShape[ByteString]] { + + val out = Outlet[ByteString]("ChannelSource.out") + val shape = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + val channel = createChannel() + + object Handler extends OutHandler { + override def onPull(): Unit = { + try { + val buffer = ByteBuffer.allocate(chunkSize) + if (channel.read(buffer) > 0) { + buffer.flip() + push(out, ByteString.fromByteBuffer(buffer)) + } else { + completeStage() + } + } catch { + case NonFatal(_) => + channel.close() + } + } + override def onDownstreamFinish(): Unit = { + channel.close() + } + } + + setHandler(out, Handler) + } + +} + +class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) + extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { + + val in = Inlet[ByteString]("ChannelSink.in") + val shape = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogic(shape) { + val channel = createChannel() + + object Handler extends InHandler { + override def onPush(): Unit = { + try { + val data = grab(in) + channel.write(data.asByteBuffer) + pull(in) + } catch { + case NonFatal(e) => + channel.close() + promise.failure(e) + } + } + + override def onUpstreamFinish(): Unit = { + channel.close() + completeStage() + promise.success(Done) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + channel.close() + promise.failure(ex) + } + } + + setHandler(in, Handler) + + override def preStart(): Unit = { + pull(in) + } + } + (logic, promise.future) + } + +} + +object ChannelStream { + + def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { + Source + .fromGraph(new ChannelSource(channel, chunkSize)) + .withAttributes(Attributes(ActorAttributes.IODispatcher)) + .async + } + + def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { + Sink + .fromGraph(new ChannelSink(channel, chunkSize)) + .withAttributes(Attributes(ActorAttributes.IODispatcher)) + .async + } + +} diff --git a/src/test/scala/xyz/driver/core/BlobStorageTest.scala b/src/test/scala/xyz/driver/core/BlobStorageTest.scala new file mode 100644 index 0000000..65f9cbc --- /dev/null +++ b/src/test/scala/xyz/driver/core/BlobStorageTest.scala @@ -0,0 +1,88 @@ +package xyz.driver.core + +import java.nio.file.Files + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.scalatest._ +import org.scalatest.concurrent.ScalaFutures +import xyz.driver.core.storage.{BlobStorage, FileSystemBlobStorage} + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class BlobStorageTest extends FlatSpec with ScalaFutures { + + implicit val patientce = PatienceConfig(timeout = 100.seconds) + + implicit val system = ActorSystem("blobstorage-test") + implicit val mat = ActorMaterializer() + import system.dispatcher + + def storageBehaviour(storage: BlobStorage) = { + val key = "foo" + val data = "hello world".getBytes + it should "upload data" in { + assert(storage.exists(key).futureValue === false) + assert(storage.uploadContent(key, data).futureValue === key) + assert(storage.exists(key).futureValue === true) + } + it should "download data" in { + val content = storage.content(key).futureValue + assert(content.isDefined) + assert(content.get === data) + } + it should "not download non-existing data" in { + assert(storage.content("bar").futureValue.isEmpty) + } + it should "overwrite an existing key" in { + val newData = "new string".getBytes("utf-8") + assert(storage.uploadContent(key, newData).futureValue === key) + assert(storage.content(key).futureValue.get === newData) + } + it should "upload a file" in { + val tmp = Files.createTempFile("testfile", "txt") + Files.write(tmp, data) + assert(storage.uploadFile(key, tmp).futureValue === key) + Files.delete(tmp) + } + it should "upload content" in { + val text = "foobar" + val src = Source + .single(text) + .map(l => ByteString(l)) + src.runWith(storage.upload(key).futureValue).futureValue + assert(storage.content(key).futureValue.map(_.toSeq) === Some("foobar".getBytes.toSeq)) + } + it should "delete content" in { + assert(storage.exists(key).futureValue) + storage.delete(key).futureValue + assert(!storage.exists(key).futureValue) + } + it should "download content" in { + storage.uploadContent(key, data) futureValue + val srcOpt = storage.download(key).futureValue + assert(srcOpt.isDefined) + val src = srcOpt.get + val content: Future[Array[Byte]] = src.runWith(Sink.fold(Array[Byte]())(_ ++ _)) + assert(content.futureValue === data) + } + it should "list keys" in { + assert(storage.list("").futureValue === Set(key)) + storage.uploadContent("a/a.txt", data).futureValue + storage.uploadContent("a/b", data).futureValue + storage.uploadContent("c/d", data).futureValue + storage.uploadContent("d", data).futureValue + assert(storage.list("").futureValue === Set(key, "a", "c", "d")) + assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) + assert(storage.list("a").futureValue === Set("a/a.txt", "a/b")) + assert(storage.list("c").futureValue === Set("c/d")) + } + } + + "File system storage" should behave like storageBehaviour( + new FileSystemBlobStorage(Files.createTempDirectory("test"))) + +} -- cgit v1.2.3 From 57ee8fa785c3815e45e473e5625d6e3cb1cd9402 Mon Sep 17 00:00:00 2001 From: Sergey Nastich Date: Mon, 9 Apr 2018 16:53:45 -0700 Subject: Add convenience methods to work with Tags: `taggedWith` wrapper and a proxy JSON format (#147) * Add convenience methods to work with Tags: `tagged` wrapper and a proxy JSON format --- src/main/scala/xyz/driver/core/core.scala | 4 ++++ src/main/scala/xyz/driver/core/json.scala | 12 ++++++++++-- src/test/scala/xyz/driver/core/JsonTest.scala | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/main/scala/xyz/driver/core/core.scala b/src/main/scala/xyz/driver/core/core.scala index be19f0f..14e1b10 100644 --- a/src/main/scala/xyz/driver/core/core.scala +++ b/src/main/scala/xyz/driver/core/core.scala @@ -25,6 +25,10 @@ package object core { object tagging { private[core] trait Tagged[+V, +Tag] + + implicit class Taggable[V <: Any](val v: V) extends AnyVal { + def tagged[Tag]: V @@ Tag = v.asInstanceOf[V @@ Tag] + } } type @@[+V, +Tag] = V with tagging.Tagged[V, Tag] diff --git a/src/main/scala/xyz/driver/core/json.scala b/src/main/scala/xyz/driver/core/json.scala index 06a8837..e7efce6 100644 --- a/src/main/scala/xyz/driver/core/json.scala +++ b/src/main/scala/xyz/driver/core/json.scala @@ -34,16 +34,24 @@ object json { } } - implicit def idFormat[T] = new RootJsonFormat[Id[T]] { + implicit def idFormat[T]: JsonFormat[Id[T]] = new JsonFormat[Id[T]] { def write(id: Id[T]) = JsString(id.value) - def read(value: JsValue) = value match { + def read(value: JsValue): Id[T] = value match { case JsString(id) if Try(UUID.fromString(id)).isSuccess => Id[T](id.toLowerCase) case JsString(id) => Id[T](id) case _ => throw DeserializationException("Id expects string") } } + implicit def taggedFormat[F, T](implicit underlying: JsonFormat[F]): JsonFormat[F @@ T] = new JsonFormat[F @@ T] { + import tagging._ + + override def write(obj: F @@ T): JsValue = underlying.write(obj) + + override def read(json: JsValue): F @@ T = underlying.read(json).tagged[T] + } + def NameInPath[T]: PathMatcher1[Name[T]] = new PathMatcher1[Name[T]] { def apply(path: Path) = path match { case Path.Segment(segment, tail) => Matched(tail, Tuple1(Name[T](segment))) diff --git a/src/test/scala/xyz/driver/core/JsonTest.scala b/src/test/scala/xyz/driver/core/JsonTest.scala index 7e8dba2..b845a44 100644 --- a/src/test/scala/xyz/driver/core/JsonTest.scala +++ b/src/test/scala/xyz/driver/core/JsonTest.scala @@ -13,6 +13,7 @@ import spray.json._ import xyz.driver.core.TestTypes.CustomGADT import xyz.driver.core.domain.{Email, PhoneNumber} import xyz.driver.core.json.enumeratum.HasJsonFormat +import xyz.driver.core.tagging.Taggable import xyz.driver.core.time.TimeOfDay import scala.collection.immutable.IndexedSeq @@ -31,6 +32,19 @@ class JsonTest extends FlatSpec with Matchers { parsedId should be(referenceId) } + "Json format for @@" should "read and write correct JSON" in { + trait Irrelevant + val reference = Id[JsonTest]("SomeID").tagged[Irrelevant] + + val format = json.taggedFormat[Id[JsonTest], Irrelevant] + + val writtenJson = format.write(reference) + writtenJson shouldBe JsString("SomeID") + + val parsedId: Id[JsonTest] @@ Irrelevant = format.read(writtenJson) + parsedId shouldBe reference + } + "Json format for Name" should "read and write correct JSON" in { val referenceName = Name[String]("Homer") -- cgit v1.2.3