aboutsummaryrefslogtreecommitdiff
path: root/core-storage/src/main/scala/xyz/driver/core/storage/GcsBlobStorage.scala
blob: 95164c78fbac1f0f126a11a365d791b5fb62562d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package xyz.driver.core.storage

import java.io.{FileInputStream, InputStream}
import java.net.URL
import java.nio.file.Path

import akka.Done
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import com.google.api.gax.paging.Page
import com.google.auth.oauth2.ServiceAccountCredentials
import com.google.cloud.storage.Storage.BlobListOption
import com.google.cloud.storage.{Blob, BlobId, Bucket, Storage, StorageOptions}

import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}

class GcsBlobStorage(client: Storage, bucketId: String, chunkSize: Int = GcsBlobStorage.DefaultChunkSize)(
    implicit ec: ExecutionContext)
    extends BlobStorage with SignedBlobStorage {

  private val bucket: Bucket = client.get(bucketId)
  require(bucket != null, s"Bucket $bucketId does not exist.")

  override def uploadContent(name: String, content: Array[Byte]): Future[String] = Future {
    bucket.create(name, content).getBlobId.getName
  }

  override def uploadFile(name: String, content: Path): Future[String] = Future {
    bucket.create(name, new FileInputStream(content.toFile)).getBlobId.getName
  }

  override def exists(name: String): Future[Boolean] = Future {
    bucket.get(name) != null
  }

  override def list(prefix: String): Future[Set[String]] = Future {
    val page: Page[Blob] = bucket.list(BlobListOption.prefix(prefix))
    page
      .iterateAll()
      .asScala
      .map(_.getName())
      .toSet
  }

  override def content(name: String): Future[Option[Array[Byte]]] = Future {
    Option(bucket.get(name)).map(blob => blob.getContent())
  }

  override def download(name: String) = Future {
    Option(bucket.get(name)).map { blob =>
      ChannelStream.fromChannel(() => blob.reader(), chunkSize)
    }
  }

  override def upload(name: String): Future[Sink[ByteString, Future[Done]]] = Future {
    val blob = bucket.create(name, Array.emptyByteArray)
    ChannelStream.toChannel(() => blob.writer(), chunkSize)
  }

  override def delete(name: String): Future[String] = Future {
    client.delete(BlobId.of(bucketId, name))
    name
  }

  override def signedDownloadUrl(name: String, duration: Duration): Future[Option[URL]] = Future {
    Option(bucket.get(name)).map(blob => blob.signUrl(duration.length, duration.unit))
  }

  override def url(name: String): Future[Option[URL]] = Future {
    val protocol: String     = "https"
    val resourcePath: String = s"storage.googleapis.com/${bucket.getName}/"
    Option(bucket.get(name)).map { blob =>
      new URL(protocol, resourcePath, blob.getName)
    }
  }
}

object GcsBlobStorage {
  final val DefaultChunkSize = 8192

  private def newClient(key: InputStream): Storage =
    StorageOptions
      .newBuilder()
      .setCredentials(ServiceAccountCredentials.fromStream(key))
      .build()
      .getService()

  def fromKeyfile(keyfile: Path, bucketId: String, chunkSize: Int = DefaultChunkSize)(
      implicit ec: ExecutionContext): GcsBlobStorage = {
    val client = newClient(new FileInputStream(keyfile.toFile))
    new GcsBlobStorage(client, bucketId, chunkSize)
  }

}