From 124fd35527601c61c20f7f9e4a3d099c5e0b883b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 25 Jul 2017 00:16:26 -0700 Subject: Add streaming download to file storage. This enables downloading of content without saving it to a file. --- .../scala/xyz/driver/core/file/GcsStorage.scala | 31 +++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) (limited to 'src/main/scala/xyz/driver/core/file/GcsStorage.scala') diff --git a/src/main/scala/xyz/driver/core/file/GcsStorage.scala b/src/main/scala/xyz/driver/core/file/GcsStorage.scala index deb8a0e..4ceab79 100644 --- a/src/main/scala/xyz/driver/core/file/GcsStorage.scala +++ b/src/main/scala/xyz/driver/core/file/GcsStorage.scala @@ -1,7 +1,12 @@ package xyz.driver.core.file +import akka.NotUsed +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.google.cloud.ReadChannel import java.io.{BufferedOutputStream, File, FileInputStream, FileOutputStream} import java.net.URL +import java.nio.ByteBuffer import java.nio.file.{Path, Paths} import java.util.concurrent.TimeUnit @@ -15,7 +20,10 @@ import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} import scalaz.{ListT, OptionT} -class GcsStorage(storageClient: Storage, bucketName: Name[Bucket], executionContext: ExecutionContext) +class GcsStorage(storageClient: Storage, + bucketName: Name[Bucket], + executionContext: ExecutionContext, + chunkSize: Int = 4096) extends SignedFileStorage { implicit private val execution: ExecutionContext = executionContext @@ -48,6 +56,27 @@ class GcsStorage(storageClient: Storage, bucketName: Name[Bucket], executionCont }) } + override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] = + OptionT.optionT(Future { + def readChunk(rc: ReadChannel): Option[ByteString] = { + val buffer = ByteBuffer.allocate(chunkSize) + val length = rc.read(buffer) + if (length > 0) { + Some(ByteString.fromByteBuffer(buffer)) + } else { + None + } + } + + Option(storageClient.get(bucketName.value, filePath.toString)).map { blob => + Source.unfoldResource[ByteString, ReadChannel]( + create = () => blob.reader(), + read = channel => readChunk(channel), + close = channel => channel.close() + ) + } + }) + override def delete(filePath: Path): Future[Unit] = Future { storageClient.delete(BlobId.of(bucketName.value, filePath.toString)) } -- cgit v1.2.3