From 0e6a5d1dbaf02192cc5ad5ff6edbb793dc6a1287 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 29 Jun 2017 20:19:13 -0700 Subject: Google pubsub util --- src/main/scala/xyz/driver/core/pubsub.scala | 84 +++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/main/scala/xyz/driver/core/pubsub.scala (limited to 'src/main') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala new file mode 100644 index 0000000..4853c35 --- /dev/null +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -0,0 +1,84 @@ +package xyz.driver.core + +import com.google.api.core.{ApiFutureCallback, ApiFutures} +import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} +import com.google.protobuf.ByteString +import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Try} + +object pubsub { + + trait PubsubPublisher { + + type Message + type Result + + def publish(message: Message): Future[Result] + } + + class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + + type Message = String + type Result = Id[PubsubMessage] + + private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() + + override def publish(message: String): Future[Id[PubsubMessage]] = { + + val data = ByteString.copyFromUtf8(message) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + + val promise = Promise[Id[PubsubMessage]]() + + make(publisher.publish(pubsubMessage)) { messageIdFeature => + ApiFutures.addCallback( + messageIdFeature, + new ApiFutureCallback[String]() { + override def onSuccess(messageId: String): Unit = { + log.info(s"Published a message with topic $topicName, message id $messageId: $message") + promise.complete(Try(Id[PubsubMessage](messageId))) + } + + override def onFailure(t: Throwable): Unit = { + log.warn(s"Failed to publish a message with topic $topicName: $message", t) + promise.complete(Failure(t)) + } + } + ) + } + + promise.future + } + } + + trait PubsubSubscriber { + + def stopListening(): Unit + } + + class GooglePubsubSubscriber[T](projectId: String, subscriptionId: String, receiver: String => Future[T], log: Logger)( + implicit ex: ExecutionContext) + extends PubsubSubscriber { + + private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) + + private val messageReceiver = new MessageReceiver() { + override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { + val stringMessage = message.getData.toStringUtf8 + log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage") + receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } + } + + private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + + subscriber.startAsync() + + override def stopListening(): Unit = { + subscriber.stopAsync() + } + } +} -- cgit v1.2.3