aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build.sbt3
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala84
-rw-r--r--src/test/scala/xyz/driver/core/FileTest.scala8
3 files changed, 90 insertions, 5 deletions
diff --git a/build.sbt b/build.sbt
index 0963f6c..1de7260 100644
--- a/build.sbt
+++ b/build.sbt
@@ -16,7 +16,8 @@ lazy val core = (project in file("."))
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.26",
- "com.google.cloud" % "google-cloud-storage" % "0.9.4-beta",
+ "com.google.cloud" % "google-cloud-pubsub" % "0.17.2-alpha",
+ "com.google.cloud" % "google-cloud-storage" % "1.0.1",
"com.typesafe.slick" %% "slick" % "3.1.1",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.4.0",
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
new file mode 100644
index 0000000..4853c35
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -0,0 +1,84 @@
+package xyz.driver.core
+
+import com.google.api.core.{ApiFutureCallback, ApiFutures}
+import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber}
+import com.google.protobuf.ByteString
+import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName}
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Try}
+
+object pubsub {
+
+ trait PubsubPublisher {
+
+ type Message
+ type Result
+
+ def publish(message: Message): Future[Result]
+ }
+
+ class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher {
+
+ type Message = String
+ type Result = Id[PubsubMessage]
+
+ private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build()
+
+ override def publish(message: String): Future[Id[PubsubMessage]] = {
+
+ val data = ByteString.copyFromUtf8(message)
+ val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()
+
+ val promise = Promise[Id[PubsubMessage]]()
+
+ make(publisher.publish(pubsubMessage)) { messageIdFeature =>
+ ApiFutures.addCallback(
+ messageIdFeature,
+ new ApiFutureCallback[String]() {
+ override def onSuccess(messageId: String): Unit = {
+ log.info(s"Published a message with topic $topicName, message id $messageId: $message")
+ promise.complete(Try(Id[PubsubMessage](messageId)))
+ }
+
+ override def onFailure(t: Throwable): Unit = {
+ log.warn(s"Failed to publish a message with topic $topicName: $message", t)
+ promise.complete(Failure(t))
+ }
+ }
+ )
+ }
+
+ promise.future
+ }
+ }
+
+ trait PubsubSubscriber {
+
+ def stopListening(): Unit
+ }
+
+ class GooglePubsubSubscriber[T](projectId: String, subscriptionId: String, receiver: String => Future[T], log: Logger)(
+ implicit ex: ExecutionContext)
+ extends PubsubSubscriber {
+
+ private val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
+
+ private val messageReceiver = new MessageReceiver() {
+ override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
+ val stringMessage = message.getData.toStringUtf8
+ log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage")
+ receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
+ }
+ }
+
+ private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build()
+
+ subscriber.startAsync()
+
+ override def stopListening(): Unit = {
+ subscriber.stopAsync()
+ }
+ }
+}
diff --git a/src/test/scala/xyz/driver/core/FileTest.scala b/src/test/scala/xyz/driver/core/FileTest.scala
index a1b0329..67878b4 100644
--- a/src/test/scala/xyz/driver/core/FileTest.scala
+++ b/src/test/scala/xyz/driver/core/FileTest.scala
@@ -117,7 +117,7 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar {
}
"Google Cloud Storage" should "upload and download files" in {
- import com.google.cloud.Page
+ import com.google.api.gax.paging.Page
import com.google.cloud.storage.{Blob, Bucket, Storage}
import Bucket.BlobWriteOption
import Storage.BlobListOption
@@ -146,9 +146,9 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar {
val gcsStorage = new GcsStorage(gcsMock, testBucket, scala.concurrent.ExecutionContext.global)
when(pageMock.iterateAll()).thenReturn(
- Iterator[Blob]().asJava,
- Iterator[Blob](blobMock).asJava,
- Iterator[Blob]().asJava
+ Iterable[Blob]().asJava,
+ Iterable[Blob](blobMock).asJava,
+ Iterable[Blob]().asJava
)
when(
gcsMock.list(testBucket.value, BlobListOption.currentDirectory(), BlobListOption.prefix(testDirPath.toString)))