package xyz.driver.core.storage
import java.io.{FileInputStream, InputStream}
import java.net.URL
import java.nio.file.Path
import akka.Done
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import com.google.api.gax.paging.Page
import com.google.auth.oauth2.ServiceAccountCredentials
import com.google.cloud.storage.Storage.BlobListOption
import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions}
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)(
implicit ec: ExecutionContext)
extends BlobStorage with SignedBlobStorage {
private val bucket: Bucket = client.get(bucketId)
require(bucket != null, s"Bucket $bucketId does not exist.")
override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
bucket.create(name, content).getBlobId.getName
}
override def uploadFile(name: String, content: Path): Future[String] = Future {
bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName
}
override def exists(name: String): Future[Boolean] = Future {
bucket.get(name) != null
}
override def list(prefix: String): Future[Set[String]] = Future {
val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix))
page
.iterateAll()
.asScala
.map(_.getName())
.toSet
}
override def content(name: String): Future[Option[Array[Byte]]] = Future {
Option(bucket.get(name)).map(blob => blob.getContent())
}
override def download(name: String) = Future {
Option(bucket.get(name)).map { blob =>
ChannelStream.fromChannel(() => blob.reader(), chunkSize)
}
}
override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
val blob = bucket.create(name, Array.emptyByteArray)
ChannelStream.toChannel(() => blob.writer(), chunkSize)
}
override def delete(name: String): Future[String] = Future {
client.delete(BlobId.of(bucketId, name))
name
}
override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit))
}
override def url(name: String): Future[Option[URL]] = Future {
val protocol: String = "https"
val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/"
Option(bucket.get(name)).map { blob =>
new URL(protocol, resourcePath, blob.getName)
}
}
}
object GcsBlobStorage {
final val DefaultChunkSize = 8192
private def newClient(key: InputStream): Storage =
StorageOptions
.newBuilder()
.setCredentials(ServiceAccountCredentials.fromStream(key))
.build()
.getService()
def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)(
implicit ec: ExecutionContext): GcsBlobStorage = {
val client = newClient(new FileInputStream(keyfile.toFile))
new GcsBlobStorage(client, bucketId, chunkSize)
}
}