From 124fd35527601c61c20f7f9e4a3d099c5e0b883b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 25 Jul 2017 00:16:26 -0700 Subject: Add streaming download to file storage. This enables downloading of content without saving it to a file. --- .../xyz/driver/core/file/FileSystemStorage.scala | 12 +++++++++ .../scala/xyz/driver/core/file/GcsStorage.scala | 31 +++++++++++++++++++++- .../scala/xyz/driver/core/file/S3Storage.scala | 13 ++++++++- src/main/scala/xyz/driver/core/file/package.scala | 5 ++++ 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala b/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala index fab1307..0d49f2d 100644 --- a/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala +++ b/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala @@ -1,5 +1,8 @@ package xyz.driver.core.file +import akka.NotUsed +import akka.stream.scaladsl.{FileIO, Source} +import akka.util.ByteString import java.io.File import java.nio.file.{Files, Path, Paths} @@ -33,6 +36,15 @@ class FileSystemStorage(executionContext: ExecutionContext) extends FileStorage Option(new File(filePath.toString)).filter(file => file.exists() && file.isFile) }) + override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] = + OptionT.optionT(Future { + if (Files.exists(filePath)) { + Some(FileIO.fromPath(filePath).mapMaterializedValue(_ => NotUsed)) + } else { + None + } + }) + override def delete(filePath: Path): Future[Unit] = Future { val file = new File(filePath.toString) if (file.delete()) () diff --git a/src/main/scala/xyz/driver/core/file/GcsStorage.scala b/src/main/scala/xyz/driver/core/file/GcsStorage.scala index deb8a0e..4ceab79 100644 --- a/src/main/scala/xyz/driver/core/file/GcsStorage.scala +++ b/src/main/scala/xyz/driver/core/file/GcsStorage.scala @@ -1,7 +1,12 @@ package xyz.driver.core.file +import akka.NotUsed +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.google.cloud.ReadChannel import java.io.{BufferedOutputStream, File, FileInputStream, FileOutputStream} import java.net.URL +import java.nio.ByteBuffer import java.nio.file.{Path, Paths} import java.util.concurrent.TimeUnit @@ -15,7 +20,10 @@ import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} -class GcsStorage(storageClient: Storage, bucketName: Name[Bucket], executionContext: ExecutionContext) +class GcsStorage(storageClient: Storage, + bucketName: Name[Bucket], + executionContext: ExecutionContext, + chunkSize: Int = 4096) extends SignedFileStorage { implicit private val execution: ExecutionContext = executionContext @@ -48,6 +56,27 @@ class GcsStorage(storageClient: Storage, bucketName: Name[Bucket], executionCont }) } + override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] = + OptionT.optionT(Future { + def readChunk(rc: ReadChannel): Option[ByteString] = { + val buffer = ByteBuffer.allocate(chunkSize) + val length = rc.read(buffer) + if (length > 0) { + Some(ByteString.fromByteBuffer(buffer)) + } else { + None + } + } + + Option(storageClient.get(bucketName.value, filePath.toString)).map { blob => + Source.unfoldResource[ByteString, ReadChannel]( + create = () => blob.reader(), + read = channel => readChunk(channel), + close = channel => channel.close() + ) + } + }) + override def delete(filePath: Path): Future[Unit] = Future { storageClient.delete(BlobId.of(bucketName.value, filePath.toString)) } diff --git a/src/main/scala/xyz/driver/core/file/S3Storage.scala b/src/main/scala/xyz/driver/core/file/S3Storage.scala index 7df3db2..49bebc8 100644 --- a/src/main/scala/xyz/driver/core/file/S3Storage.scala +++ b/src/main/scala/xyz/driver/core/file/S3Storage.scala @@ -1,5 +1,8 @@ package xyz.driver.core.file +import akka.NotUsed +import akka.stream.scaladsl.{Source, StreamConverters} +import akka.util.ByteString import java.io.File import java.nio.file.{Path, Paths} import java.util.UUID.randomUUID @@ -12,7 +15,8 @@ import xyz.driver.core.time.Time import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} -class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionContext) extends FileStorage { +class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionContext, chunkSize: Int = 4096) + extends FileStorage { implicit private val execution = executionContext override def upload(localSource: File, destination: Path): Future[Unit] = Future { @@ -36,6 +40,13 @@ class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionC } }) + override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] = + OptionT.optionT(Future { + Option(s3.getObject(new GetObjectRequest(bucket.value, filePath.toString))).map { elem => + StreamConverters.fromInputStream(elem.getObjectContent, chunkSize).mapMaterializedValue(_ => NotUsed) + } + }) + override def delete(filePath: Path): Future[Unit] = Future { s3.deleteObject(bucket.value, filePath.toString) } diff --git a/src/main/scala/xyz/driver/core/file/package.scala b/src/main/scala/xyz/driver/core/file/package.scala index b2c679e..dd0998d 100644 --- a/src/main/scala/xyz/driver/core/file/package.scala +++ b/src/main/scala/xyz/driver/core/file/package.scala @@ -10,6 +10,9 @@ import scalaz.{ListT, OptionT} package file { + import akka.NotUsed + import akka.stream.scaladsl.Source + import akka.util.ByteString import java.net.URL import scala.concurrent.duration.Duration @@ -35,6 +38,8 @@ package file { def download(filePath: Path): OptionT[Future, File] + def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] + def delete(filePath: Path): Future[Unit] /** List contents of a directory */ -- cgit v1.2.3