diff options
author | vlad <vlad@driver.xyz> | 2017-06-29 20:19:13 -0700 |
---|---|---|
committer | vlad <vlad@driver.xyz> | 2017-06-29 20:19:13 -0700 |
commit | 0e6a5d1dbaf02192cc5ad5ff6edbb793dc6a1287 (patch) | |
tree | d5ba438f17cfb59d01dc48544a98138e5c37d1ff | |
parent | 38b7278027e81c97afde27fb48eefde3c6e254bd (diff) | |
download | driver-core-0e6a5d1dbaf02192cc5ad5ff6edbb793dc6a1287.tar.gz driver-core-0e6a5d1dbaf02192cc5ad5ff6edbb793dc6a1287.tar.bz2 driver-core-0e6a5d1dbaf02192cc5ad5ff6edbb793dc6a1287.zip |
Google pubsub util
-rw-r--r-- | build.sbt | 3 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/core/pubsub.scala | 84 | ||||
-rw-r--r-- | src/test/scala/xyz/driver/core/FileTest.scala | 8 |
3 files changed, 90 insertions, 5 deletions
@@ -16,7 +16,8 @@ lazy val core = (project in file(".")) "org.mockito" % "mockito-core" % "1.9.5" % "test", "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1", "com.amazonaws" % "aws-java-sdk-s3" % "1.11.26", - "com.google.cloud" % "google-cloud-storage" % "0.9.4-beta", + "com.google.cloud" % "google-cloud-pubsub" % "0.17.2-alpha", + "com.google.cloud" % "google-cloud-storage" % "1.0.1", "com.typesafe.slick" %% "slick" % "3.1.1", "com.typesafe" % "config" % "1.2.1", "com.typesafe.scala-logging" %% "scala-logging" % "3.4.0", diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala new file mode 100644 index 0000000..4853c35 --- /dev/null +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -0,0 +1,84 @@ +package xyz.driver.core + +import com.google.api.core.{ApiFutureCallback, ApiFutures} +import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} +import com.google.protobuf.ByteString +import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Try} + +object pubsub { + + trait PubsubPublisher { + + type Message + type Result + + def publish(message: Message): Future[Result] + } + + class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + + type Message = String + type Result = Id[PubsubMessage] + + private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() + + override def publish(message: String): Future[Id[PubsubMessage]] = { + + val data = ByteString.copyFromUtf8(message) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + + val promise = Promise[Id[PubsubMessage]]() + + make(publisher.publish(pubsubMessage)) { messageIdFeature => + ApiFutures.addCallback( + messageIdFeature, + new ApiFutureCallback[String]() { + override def onSuccess(messageId: String): Unit = { + log.info(s"Published a message with topic $topicName, message id $messageId: $message") + promise.complete(Try(Id[PubsubMessage](messageId))) + } + + override def onFailure(t: Throwable): Unit = { + log.warn(s"Failed to publish a message with topic $topicName: $message", t) + promise.complete(Failure(t)) + } + } + ) + } + + promise.future + } + } + + trait PubsubSubscriber { + + def stopListening(): Unit + } + + class GooglePubsubSubscriber[T](projectId: String, subscriptionId: String, receiver: String => Future[T], log: Logger)( + implicit ex: ExecutionContext) + extends PubsubSubscriber { + + private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) + + private val messageReceiver = new MessageReceiver() { + override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { + val stringMessage = message.getData.toStringUtf8 + log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage") + receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } + } + + private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + + subscriber.startAsync() + + override def stopListening(): Unit = { + subscriber.stopAsync() + } + } +} diff --git a/src/test/scala/xyz/driver/core/FileTest.scala b/src/test/scala/xyz/driver/core/FileTest.scala index a1b0329..67878b4 100644 --- a/src/test/scala/xyz/driver/core/FileTest.scala +++ b/src/test/scala/xyz/driver/core/FileTest.scala @@ -117,7 +117,7 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar { } "Google Cloud Storage" should "upload and download files" in { - import com.google.cloud.Page + import com.google.api.gax.paging.Page import com.google.cloud.storage.{Blob, Bucket, Storage} import Bucket.BlobWriteOption import Storage.BlobListOption @@ -146,9 +146,9 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar { val gcsStorage = new GcsStorage(gcsMock, testBucket, scala.concurrent.ExecutionContext.global) when(pageMock.iterateAll()).thenReturn( - Iterator[Blob]().asJava, - Iterator[Blob](blobMock).asJava, - Iterator[Blob]().asJava + Iterable[Blob]().asJava, + Iterable[Blob](blobMock).asJava, + Iterable[Blob]().asJava ) when( gcsMock.list(testBucket.value, BlobListOption.currentDirectory(), BlobListOption.prefix(testDirPath.toString))) |