diff options
author | Jakob Odersky <jakob@driver.xyz> | 2018-09-12 16:18:26 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2018-10-09 16:19:39 -0700 |
commit | 7c755c77afbd67ae2ded9d8b004736d4e27e208f (patch) | |
tree | e93f4590165a338ed284adeb6f4a6bd43bb16b6a /src/main/scala/xyz/driver/core/storage | |
parent | 76db2360364a35be31414a12cbc419a534a51744 (diff) | |
download | driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.gz driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.bz2 driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.zip |
Move storage and messaging to separate projects
Diffstat (limited to 'src/main/scala/xyz/driver/core/storage')
5 files changed, 0 insertions, 448 deletions
diff --git a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala deleted file mode 100644 index b5e8678..0000000 --- a/src/main/scala/xyz/driver/core/storage/AliyunBlobStorage.scala +++ /dev/null @@ -1,108 +0,0 @@ -package xyz.driver.core.storage - -import java.io.ByteArrayInputStream -import java.net.URL -import java.nio.file.Path -import java.util.Date - -import akka.Done -import akka.stream.scaladsl.{Sink, Source, StreamConverters} -import akka.util.ByteString -import com.aliyun.oss.OSSClient -import com.aliyun.oss.model.ObjectPermission -import com.typesafe.config.Config -import xyz.driver.core.time.provider.TimeProvider - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class AliyunBlobStorage( - client: OSSClient, - bucketId: String, - timeProvider: TimeProvider, - chunkSize: Int = AliyunBlobStorage.DefaultChunkSize)(implicit ec: ExecutionContext) - extends SignedBlobStorage { - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - client.putObject(bucketId, name, new ByteArrayInputStream(content)) - name - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - client.putObject(bucketId, name, content.toFile) - name - } - - override def exists(name: String): Future[Boolean] = Future { - client.doesObjectExist(bucketId, name) - } - - override def list(prefix: String): Future[Set[String]] = Future { - client.listObjects(bucketId, prefix).getObjectSummaries.asScala.map(_.getKey)(collection.breakOut) - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - val inputStream = obj.getObjectContent - Stream.continually(inputStream.read).takeWhile(_ != -1).map(_.toByte).toArray - } - } - - override def download(name: String): Future[Option[Source[ByteString, Any]]] = Future { - Option(client.getObject(bucketId, name)).map { obj => - StreamConverters.fromInputStream(() => obj.getObjectContent, chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - StreamConverters - .asInputStream() - .mapMaterializedValue(is => - Future { - client.putObject(bucketId, name, is) - Done - }) - } - - override def delete(name: String): Future[String] = Future { - client.deleteObject(bucketId, name) - name - } - - override def url(name: String): Future[Option[URL]] = Future { - // Based on https://www.alibabacloud.com/help/faq-detail/39607.htm - Option(client.getObjectAcl(bucketId, name)).map { acl => - val isPrivate = acl.getPermission == ObjectPermission.Private - val bucket = client.getBucketInfo(bucketId).getBucket - val endpointUrl = if (isPrivate) bucket.getIntranetEndpoint else bucket.getExtranetEndpoint - new URL(s"https://$bucketId.$endpointUrl/$name") - } - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - if (client.doesObjectExist(bucketId, name)) { - val expiration = new Date(timeProvider.currentTime().advanceBy(duration).millis) - Some(client.generatePresignedUrl(bucketId, name, expiration)) - } else { - None - } - } -} - -object AliyunBlobStorage { - val DefaultChunkSize: Int = 8192 - - def apply(config: Config, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val clientId = config.getString("storage.aliyun.clientId") - val clientSecret = config.getString("storage.aliyun.clientSecret") - val endpoint = config.getString("storage.aliyun.endpoint") - this(clientId, clientSecret, endpoint, bucketId, timeProvider) - } - - def apply(clientId: String, clientSecret: String, endpoint: String, bucketId: String, timeProvider: TimeProvider)( - implicit ec: ExecutionContext): AliyunBlobStorage = { - val client = new OSSClient(endpoint, clientId, clientSecret) - new AliyunBlobStorage(client, bucketId, timeProvider) - } -} diff --git a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala b/src/main/scala/xyz/driver/core/storage/BlobStorage.scala deleted file mode 100644 index 0cde96a..0000000 --- a/src/main/scala/xyz/driver/core/storage/BlobStorage.scala +++ /dev/null @@ -1,50 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.{Sink, Source} -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration.Duration - -/** Binary key-value store, typically implemented by cloud storage. */ -trait BlobStorage { - - /** Upload data by value. */ - def uploadContent(name: String, content: Array[Byte]): Future[String] - - /** Upload data from an existing file. */ - def uploadFile(name: String, content: Path): Future[String] - - def exists(name: String): Future[Boolean] - - /** List available keys. The prefix determines which keys should be listed - * and depends on the implementation (for instance, a file system backed - * blob store will treat a prefix as a directory path). */ - def list(prefix: String): Future[Set[String]] - - /** Get all the content of a given object. */ - def content(name: String): Future[Option[Array[Byte]]] - - /** Stream data asynchronously and with backpressure. */ - def download(name: String): Future[Option[Source[ByteString, Any]]] - - /** Get a sink to upload data. */ - def upload(name: String): Future[Sink[ByteString, Future[Done]]] - - /** Delete a stored value. */ - def delete(name: String): Future[String] - - /** - * Path to specified resource. Checks that the resource exists and returns None if - * it is not found. Depending on the implementation, may throw. - */ - def url(name: String): Future[Option[URL]] -} - -trait SignedBlobStorage extends BlobStorage { - def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] -} diff --git a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala deleted file mode 100644 index e12c73d..0000000 --- a/src/main/scala/xyz/driver/core/storage/FileSystemBlobStorage.scala +++ /dev/null @@ -1,82 +0,0 @@ -package xyz.driver.core.storage - -import java.net.URL -import java.nio.file.{Files, Path, StandardCopyOption} - -import akka.stream.scaladsl.{FileIO, Sink, Source} -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} - -/** A blob store that is backed by a local filesystem. All objects are stored relative to the given - * root path. Slashes ('/') in blob names are treated as usual path separators and are converted - * to directories. */ -class FileSystemBlobStorage(root: Path)(implicit ec: ExecutionContext) extends BlobStorage { - - private def ensureParents(file: Path): Path = { - Files.createDirectories(file.getParent()) - file - } - - private def file(name: String) = root.resolve(name) - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - Files.write(ensureParents(file(name)), content) - name - } - override def uploadFile(name: String, content: Path): Future[String] = Future { - Files.copy(content, ensureParents(file(name)), StandardCopyOption.REPLACE_EXISTING) - name - } - - override def exists(name: String): Future[Boolean] = Future { - val path = file(name) - Files.exists(path) && Files.isReadable(path) - } - - override def list(prefix: String): Future[Set[String]] = Future { - val dir = file(prefix) - Files - .list(dir) - .iterator() - .asScala - .map(p => root.relativize(p)) - .map(_.toString) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = exists(name) map { - case true => - Some(Files.readAllBytes(file(name))) - case false => None - } - - override def download(name: String): Future[Option[Source[ByteString, NotUsed]]] = Future { - if (Files.exists(file(name))) { - Some(FileIO.fromPath(file(name)).mapMaterializedValue(_ => NotUsed)) - } else { - None - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val f = ensureParents(file(name)) - FileIO.toPath(f).mapMaterializedValue(_.map(_ => Done)) - } - - override def delete(name: String): Future[String] = exists(name).map { e => - if (e) { - Files.delete(file(name)) - } - name - } - - override def url(name: String): Future[Option[URL]] = exists(name) map { - case true => - Some(root.resolve(name).toUri.toURL) - case false => - None - } -} diff --git a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala b/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala deleted file mode 100644 index 95164c7..0000000 --- a/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala +++ /dev/null @@ -1,96 +0,0 @@ -package xyz.driver.core.storage - -import java.io.{FileInputStream, InputStream} -import java.net.URL -import java.nio.file.Path - -import akka.Done -import akka.stream.scaladsl.Sink -import akka.util.ByteString -import com.google.api.gax.paging.Page -import com.google.auth.oauth2.ServiceAccountCredentials -import com.google.cloud.storage.Storage.BlobListOption -import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Future} - -class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)( - implicit ec: ExecutionContext) - extends BlobStorage with SignedBlobStorage { - - private val bucket: Bucket = client.get(bucketId) - require(bucket != null, s"Bucket $bucketId does not exist.") - - override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future { - bucket.create(name, content).getBlobId.getName - } - - override def uploadFile(name: String, content: Path): Future[String] = Future { - bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName - } - - override def exists(name: String): Future[Boolean] = Future { - bucket.get(name) != null - } - - override def list(prefix: String): Future[Set[String]] = Future { - val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix)) - page - .iterateAll() - .asScala - .map(_.getName()) - .toSet - } - - override def content(name: String): Future[Option[Array[Byte]]] = Future { - Option(bucket.get(name)).map(blob => blob.getContent()) - } - - override def download(name: String) = Future { - Option(bucket.get(name)).map { blob => - ChannelStream.fromChannel(() => blob.reader(), chunkSize) - } - } - - override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future { - val blob = bucket.create(name, Array.emptyByteArray) - ChannelStream.toChannel(() => blob.writer(), chunkSize) - } - - override def delete(name: String): Future[String] = Future { - client.delete(BlobId.of(bucketId, name)) - name - } - - override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future { - Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit)) - } - - override def url(name: String): Future[Option[URL]] = Future { - val protocol: String = "https" - val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/" - Option(bucket.get(name)).map { blob => - new URL(protocol, resourcePath, blob.getName) - } - } -} - -object GcsBlobStorage { - final val DefaultChunkSize = 8192 - - private def newClient(key: InputStream): Storage = - StorageOptions - .newBuilder() - .setCredentials(ServiceAccountCredentials.fromStream(key)) - .build() - .getService() - - def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)( - implicit ec: ExecutionContext): GcsBlobStorage = { - val client = newClient(new FileInputStream(keyfile.toFile)) - new GcsBlobStorage(client, bucketId, chunkSize) - } - -} diff --git a/src/main/scala/xyz/driver/core/storage/channelStreams.scala b/src/main/scala/xyz/driver/core/storage/channelStreams.scala deleted file mode 100644 index fc652be..0000000 --- a/src/main/scala/xyz/driver/core/storage/channelStreams.scala +++ /dev/null @@ -1,112 +0,0 @@ -package xyz.driver.core.storage - -import java.nio.ByteBuffer -import java.nio.channels.{ReadableByteChannel, WritableByteChannel} - -import akka.stream._ -import akka.stream.scaladsl.{Sink, Source} -import akka.stream.stage._ -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.concurrent.{Future, Promise} -import scala.util.control.NonFatal - -class ChannelSource(createChannel: () => ReadableByteChannel, chunkSize: Int) - extends GraphStage[SourceShape[ByteString]] { - - val out = Outlet[ByteString]("ChannelSource.out") - val shape = SourceShape(out) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends OutHandler { - override def onPull(): Unit = { - try { - val buffer = ByteBuffer.allocate(chunkSize) - if (channel.read(buffer) > 0) { - buffer.flip() - push(out, ByteString.fromByteBuffer(buffer)) - } else { - completeStage() - } - } catch { - case NonFatal(_) => - channel.close() - } - } - override def onDownstreamFinish(): Unit = { - channel.close() - } - } - - setHandler(out, Handler) - } - -} - -class ChannelSink(createChannel: () => WritableByteChannel, chunkSize: Int) - extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { - - val in = Inlet[ByteString]("ChannelSink.in") - val shape = SinkShape(in) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { - val promise = Promise[Done]() - val logic = new GraphStageLogic(shape) { - val channel = createChannel() - - object Handler extends InHandler { - override def onPush(): Unit = { - try { - val data = grab(in) - channel.write(data.asByteBuffer) - pull(in) - } catch { - case NonFatal(e) => - channel.close() - promise.failure(e) - } - } - - override def onUpstreamFinish(): Unit = { - channel.close() - completeStage() - promise.success(Done) - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - channel.close() - promise.failure(ex) - } - } - - setHandler(in, Handler) - - override def preStart(): Unit = { - pull(in) - } - } - (logic, promise.future) - } - -} - -object ChannelStream { - - def fromChannel(channel: () => ReadableByteChannel, chunkSize: Int = 8192): Source[ByteString, NotUsed] = { - Source - .fromGraph(new ChannelSource(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - - def toChannel(channel: () => WritableByteChannel, chunkSize: Int = 8192): Sink[ByteString, Future[Done]] = { - Sink - .fromGraph(new ChannelSink(channel, chunkSize)) - .withAttributes(Attributes(ActorAttributes.IODispatcher)) - .async - } - -} |