aboutsummaryrefslogtreecommitdiff
path: root/jvm/src/main/scala/xyz/driver/core/file
diff options
context:
space:
mode:
Diffstat (limited to 'jvm/src/main/scala/xyz/driver/core/file')
-rw-r--r--jvm/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala76
-rw-r--r--jvm/src/main/scala/xyz/driver/core/file/GcsStorage.scala135
-rw-r--r--jvm/src/main/scala/xyz/driver/core/file/S3Storage.scala87
-rw-r--r--jvm/src/main/scala/xyz/driver/core/file/package.scala68
4 files changed, 366 insertions, 0 deletions
diff --git a/jvm/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala b/jvm/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala
new file mode 100644
index 0000000..ce26fe4
--- /dev/null
+++ b/jvm/src/main/scala/xyz/driver/core/file/FileSystemStorage.scala
@@ -0,0 +1,76 @@
+package xyz.driver.core.file
+
+import akka.NotUsed
+import akka.stream.scaladsl.{FileIO, Source}
+import akka.util.ByteString
+import java.io.File
+import java.nio.file.{Files, Path, Paths}
+
+import xyz.driver.core.{Name, Revision}
+import xyz.driver.core.time.Time
+
+import scala.concurrent.{ExecutionContext, Future}
+import scalaz.{ListT, OptionT}
+
+@deprecated("Consider using xyz.driver.core.storage.FileSystemBlobStorage instead", "driver-core 1.8.14")
+class FileSystemStorage(executionContext: ExecutionContext) extends FileStorage {
+ implicit private val execution = executionContext
+
+ override def upload(localSource: File, destination: Path): Future[Unit] = Future {
+ checkSafeFileName(destination) {
+ val destinationFile = destination.toFile
+
+ if (destinationFile.getParentFile.exists() || destinationFile.getParentFile.mkdirs()) {
+ if (localSource.renameTo(destinationFile)) ()
+ else {
+ throw new Exception(
+ s"Failed to move file from `${localSource.getCanonicalPath}` to `${destinationFile.getCanonicalPath}`")
+ }
+ } else {
+ throw new Exception(s"Failed to create parent directories for file `${destinationFile.getCanonicalPath}`")
+ }
+ }
+ }
+
+ override def download(filePath: Path): OptionT[Future, File] =
+ OptionT.optionT(Future {
+ Option(new File(filePath.toString)).filter(file => file.exists() && file.isFile)
+ })
+
+ override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] =
+ OptionT.optionT(Future {
+ if (Files.exists(filePath)) {
+ Some(FileIO.fromPath(filePath).mapMaterializedValue(_ => NotUsed))
+ } else {
+ None
+ }
+ })
+
+ override def delete(filePath: Path): Future[Unit] = Future {
+ val file = new File(filePath.toString)
+ if (file.delete()) ()
+ else {
+ throw new Exception(s"Failed to delete file $file" + (if (!file.exists()) ", file does not exist." else "."))
+ }
+ }
+
+ override def list(path: Path): ListT[Future, FileLink] =
+ ListT.listT(Future {
+ val file = new File(path.toString)
+ if (file.isDirectory) {
+ file.listFiles().toList.filter(_.isFile).map { file =>
+ FileLink(
+ Name[File](file.getName),
+ Paths.get(file.getPath),
+ Revision[File](file.hashCode.toString),
+ Time(file.lastModified()),
+ file.length())
+ }
+ } else List.empty[FileLink]
+ })
+
+ override def exists(path: Path): Future[Boolean] = Future {
+ Files.exists(path)
+ }
+
+}
diff --git a/jvm/src/main/scala/xyz/driver/core/file/GcsStorage.scala b/jvm/src/main/scala/xyz/driver/core/file/GcsStorage.scala
new file mode 100644
index 0000000..5c94645
--- /dev/null
+++ b/jvm/src/main/scala/xyz/driver/core/file/GcsStorage.scala
@@ -0,0 +1,135 @@
+package xyz.driver.core.file
+
+import akka.NotUsed
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+import com.google.cloud.ReadChannel
+import java.io.{BufferedOutputStream, File, FileInputStream, FileOutputStream}
+import java.net.URL
+import java.nio.ByteBuffer
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.TimeUnit
+
+import com.google.cloud.storage.Storage.BlobListOption
+import com.google.cloud.storage.{Option => _, _}
+import xyz.driver.core.time.Time
+import xyz.driver.core.{Name, Revision, generators}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ExecutionContext, Future}
+import scalaz.{ListT, OptionT}
+
+@deprecated("Consider using xyz.driver.core.storage.GcsBlobStorage instead", "driver-core 1.8.14")
+class GcsStorage(
+ storageClient: Storage,
+ bucketName: Name[Bucket],
+ executionContext: ExecutionContext,
+ chunkSize: Int = 4096)
+ extends SignedFileStorage {
+ implicit private val execution: ExecutionContext = executionContext
+
+ override def upload(localSource: File, destination: Path): Future[Unit] = Future {
+ checkSafeFileName(destination) {
+ val blobId = BlobId.of(bucketName.value, destination.toString)
+ def acl = Bucket.BlobWriteOption.predefinedAcl(Storage.PredefinedAcl.PUBLIC_READ)
+
+ storageClient.get(bucketName.value).create(blobId.getName, new FileInputStream(localSource), acl)
+ }
+ }
+
+ override def download(filePath: Path): OptionT[Future, File] = {
+ OptionT.optionT(Future {
+ Option(storageClient.get(bucketName.value, filePath.toString)).filterNot(_.getSize == 0).map {
+ blob =>
+ val tempDir = System.getProperty("java.io.tmpdir")
+ val randomFolderName = generators.nextUuid().toString
+ val tempDestinationFile = new File(Paths.get(tempDir, randomFolderName, filePath.toString).toString)
+
+ if (!tempDestinationFile.getParentFile.mkdirs()) {
+ throw new Exception(s"Failed to create temp directory to download file `$tempDestinationFile`")
+ } else {
+ val target = new BufferedOutputStream(new FileOutputStream(tempDestinationFile))
+ try target.write(blob.getContent())
+ finally target.close()
+ tempDestinationFile
+ }
+ }
+ })
+ }
+
+ override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] =
+ OptionT.optionT(Future {
+ def readChunk(rc: ReadChannel): Option[ByteString] = {
+ val buffer = ByteBuffer.allocate(chunkSize)
+ val length = rc.read(buffer)
+ if (length > 0) {
+ buffer.flip()
+ Some(ByteString.fromByteBuffer(buffer))
+ } else {
+ None
+ }
+ }
+
+ Option(storageClient.get(bucketName.value, filePath.toString)).map { blob =>
+ Source.unfoldResource[ByteString, ReadChannel](
+ create = () => blob.reader(),
+ read = channel => readChunk(channel),
+ close = channel => channel.close()
+ )
+ }
+ })
+
+ override def delete(filePath: Path): Future[Unit] = Future {
+ storageClient.delete(BlobId.of(bucketName.value, filePath.toString))
+ }
+
+ override def list(directoryPath: Path): ListT[Future, FileLink] =
+ ListT.listT(Future {
+ val directory = s"$directoryPath/"
+ val page = storageClient.list(
+ bucketName.value,
+ BlobListOption.currentDirectory(),
+ BlobListOption.prefix(directory)
+ )
+
+ page
+ .iterateAll()
+ .asScala
+ .filter(_.getName != directory)
+ .map(blobToFileLink(directoryPath, _))
+ .toList
+ })
+
+ protected def blobToFileLink(path: Path, blob: Blob): FileLink = {
+ def nullError(property: String) = throw new IllegalStateException(s"Blob $blob at $path does not have $property")
+ val name = Option(blob.getName).getOrElse(nullError("a name"))
+ val generation = Option(blob.getGeneration).getOrElse(nullError("a generation"))
+ val updateTime = Option(blob.getUpdateTime).getOrElse(nullError("an update time"))
+ val size = Option(blob.getSize).getOrElse(nullError("a size"))
+
+ FileLink(
+ Name(name.split('/').last),
+ Paths.get(name),
+ Revision(generation.toString),
+ Time(updateTime),
+ size
+ )
+ }
+
+ override def exists(path: Path): Future[Boolean] = Future {
+ val blob = Option(
+ storageClient.get(
+ bucketName.value,
+ path.toString
+ ))
+ blob.isDefined
+ }
+
+ override def signedFileUrl(filePath: Path, duration: Duration): OptionT[Future, URL] =
+ OptionT.optionT(Future {
+ Option(storageClient.get(bucketName.value, filePath.toString)).filterNot(_.getSize == 0).map { blob =>
+ blob.signUrl(duration.toSeconds, TimeUnit.SECONDS)
+ }
+ })
+}
diff --git a/jvm/src/main/scala/xyz/driver/core/file/S3Storage.scala b/jvm/src/main/scala/xyz/driver/core/file/S3Storage.scala
new file mode 100644
index 0000000..5158d4d
--- /dev/null
+++ b/jvm/src/main/scala/xyz/driver/core/file/S3Storage.scala
@@ -0,0 +1,87 @@
+package xyz.driver.core.file
+
+import akka.NotUsed
+import akka.stream.scaladsl.{Source, StreamConverters}
+import akka.util.ByteString
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.UUID.randomUUID
+
+import com.amazonaws.services.s3.AmazonS3
+import com.amazonaws.services.s3.model.{Bucket, GetObjectRequest, ListObjectsV2Request}
+import xyz.driver.core.{Name, Revision}
+import xyz.driver.core.time.Time
+
+import scala.concurrent.{ExecutionContext, Future}
+import scalaz.{ListT, OptionT}
+
+@deprecated(
+ "Blob storage functionality has been reimplemented in xyz.driver.core.storage.BlobStorage. " +
+ "It has not been ported to S3 storage. Please raise an issue if this required for your use-case.",
+ "driver-core 1.8.14"
+)
+class S3Storage(s3: AmazonS3, bucket: Name[Bucket], executionContext: ExecutionContext, chunkSize: Int = 4096)
+ extends FileStorage {
+ implicit private val execution = executionContext
+
+ override def upload(localSource: File, destination: Path): Future[Unit] = Future {
+ checkSafeFileName(destination) {
+ val _ = s3.putObject(bucket.value, destination.toString, localSource).getETag
+ }
+ }
+
+ override def download(filePath: Path): OptionT[Future, File] =
+ OptionT.optionT(Future {
+ val tempDir = System.getProperty("java.io.tmpdir")
+ val randomFolderName = randomUUID().toString
+ val tempDestinationFile = new File(Paths.get(tempDir, randomFolderName, filePath.toString).toString)
+
+ if (!tempDestinationFile.getParentFile.mkdirs()) {
+ throw new Exception(s"Failed to create temp directory to download file `$tempDestinationFile`")
+ } else {
+ Option(s3.getObject(new GetObjectRequest(bucket.value, filePath.toString), tempDestinationFile)).map { _ =>
+ tempDestinationFile
+ }
+ }
+ })
+
+ override def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]] =
+ OptionT.optionT(Future {
+ Option(s3.getObject(new GetObjectRequest(bucket.value, filePath.toString))).map { elem =>
+ StreamConverters.fromInputStream(() => elem.getObjectContent(), chunkSize).mapMaterializedValue(_ => NotUsed)
+ }
+ })
+
+ override def delete(filePath: Path): Future[Unit] = Future {
+ s3.deleteObject(bucket.value, filePath.toString)
+ }
+
+ override def list(path: Path): ListT[Future, FileLink] =
+ ListT.listT(Future {
+ import scala.collection.JavaConverters._
+ val req = new ListObjectsV2Request().withBucketName(bucket.value).withPrefix(path.toString).withMaxKeys(2)
+
+ def isInSubFolder(path: Path)(fileLink: FileLink) =
+ fileLink.location.toString.replace(path.toString + "/", "").contains("/")
+
+ Iterator.continually(s3.listObjectsV2(req)).takeWhile { result =>
+ req.setContinuationToken(result.getNextContinuationToken)
+ result.isTruncated
+ } flatMap { result =>
+ result.getObjectSummaries.asScala.toList.map { summary =>
+ FileLink(
+ Name[File](summary.getKey),
+ Paths.get(path.toString + "/" + summary.getKey),
+ Revision[File](summary.getETag),
+ Time(summary.getLastModified.getTime),
+ summary.getSize
+ )
+ } filterNot isInSubFolder(path)
+ } toList
+ })
+
+ override def exists(path: Path): Future[Boolean] = Future {
+ s3.doesObjectExist(bucket.value, path.toString)
+ }
+
+}
diff --git a/jvm/src/main/scala/xyz/driver/core/file/package.scala b/jvm/src/main/scala/xyz/driver/core/file/package.scala
new file mode 100644
index 0000000..58955e5
--- /dev/null
+++ b/jvm/src/main/scala/xyz/driver/core/file/package.scala
@@ -0,0 +1,68 @@
+package xyz.driver.core
+
+import java.io.File
+import java.nio.file.Path
+
+import xyz.driver.core.time.Time
+
+import scala.concurrent.Future
+import scalaz.{ListT, OptionT}
+
+package file {
+
+ import akka.NotUsed
+ import akka.stream.scaladsl.Source
+ import akka.util.ByteString
+ import java.net.URL
+
+ import scala.concurrent.duration.Duration
+
+ final case class FileLink(
+ name: Name[File],
+ location: Path,
+ revision: Revision[File],
+ lastModificationDate: Time,
+ fileSize: Long
+ )
+
+ trait FileService {
+
+ def getFileLink(id: Name[File]): FileLink
+
+ def getFile(fileLink: FileLink): File
+ }
+
+ trait FileStorage {
+
+ def upload(localSource: File, destination: Path): Future[Unit]
+
+ def download(filePath: Path): OptionT[Future, File]
+
+ def stream(filePath: Path): OptionT[Future, Source[ByteString, NotUsed]]
+
+ def delete(filePath: Path): Future[Unit]
+
+ /** List contents of a directory */
+ def list(directoryPath: Path): ListT[Future, FileLink]
+
+ def exists(path: Path): Future[Boolean]
+
+ /** List of characters to avoid in S3 (I would say file names in general)
+ *
+ * @see http://stackoverflow.com/questions/7116450/what-are-valid-s3-key-names-that-can-be-accessed-via-the-s3-rest-api
+ */
+ private val illegalChars = "\\^`><{}][#%~|&@:,$=+?; "
+
+ protected def checkSafeFileName[T](filePath: Path)(f: => T): T = {
+ filePath.toString.find(c => illegalChars.contains(c)) match {
+ case Some(illegalCharacter) =>
+ throw new IllegalArgumentException(s"File name cannot contain character `$illegalCharacter`")
+ case None => f
+ }
+ }
+ }
+
+ trait SignedFileStorage extends FileStorage {
+ def signedFileUrl(filePath: Path, duration: Duration): OptionT[Future, URL]
+ }
+}