From 124fd35527601c61c20f7f9e4a3d099c5e0b883b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 25 Jul 2017 00:16:26 -0700 Subject: Add streaming download to file storage. This enables downloading of content without saving it to a file. --- src/main/scala/xyz/driver/core/file/S3Storage.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'src/main/scala/xyz/driver/core/file/S3Storage.scala') diff --git a/src/main/scala/xyz/driver/core/file/S3Storage.scala b/src/main/scala/xyz/driver/core/file/S3Storage.scala index 7df3db2..49bebc8 100644 --- a/src/main/scala/xyz/driver/core/file/S3Storage.scala +++ b/src/main/scala/xyz/driver/core/file/S3Storage.scala @@ -1,5 +1,8 @@ package xyz.driver.core.file +import akka.NotUsed +import akka.stream.scaladsl.{Source, StreamConverters} +import akka.util.ByteString import java.io.File import java.nio.file.{Path, Paths} import java.util.UUID.randomUUID @@ -12,7 +15,8 @@ import xyz.driver.core.time.Time import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} -class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionContext) extends FileStorage { +class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionContext, chunkSize: Int = 4096) + extends FileStorage { implicit private val execution = executionContext override def upload(localSource: File, destination: Path): Future[Unit] = Future { @@ -36,6 +40,13 @@ class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionC } }) + override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] = + OptionT.optionT(Future { + Option(s3.getObject(new GetObjectRequest(bucket.value, filePath.toString))).map { elem => + StreamConverters.fromInputStream(elem.getObjectContent, chunkSize).mapMaterializedValue(_ => NotUsed) + } + }) + override def delete(filePath: Path): Future[Unit] = Future { s3.deleteObject(bucket.value, filePath.toString) } -- cgit v1.2.3