From 0e6a5d1dbaf02192cc5ad5ff6edbb793dc6a1287 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 29 Jun 2017 20:19:13 -0700 Subject: Google pubsub util --- src/main/scala/xyz/driver/core/pubsub.scala | 84 +++++++++++++++++++++++++++ src/test/scala/xyz/driver/core/FileTest.scala | 8 +-- 2 files changed, 88 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/xyz/driver/core/pubsub.scala (limited to 'src') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala new file mode 100644 index 0000000..4853c35 --- /dev/null +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -0,0 +1,84 @@ +package xyz.driver.core + +import com.google.api.core.{ApiFutureCallback, ApiFutures} +import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} +import com.google.protobuf.ByteString +import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Try} + +object pubsub { + + trait PubsubPublisher { + + type Message + type Result + + def publish(message: Message): Future[Result] + } + + class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + + type Message = String + type Result = Id[PubsubMessage] + + private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() + + override def publish(message: String): Future[Id[PubsubMessage]] = { + + val data = ByteString.copyFromUtf8(message) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + + val promise = Promise[Id[PubsubMessage]]() + + make(publisher.publish(pubsubMessage)) { messageIdFeature => + ApiFutures.addCallback( + messageIdFeature, + new ApiFutureCallback[String]() { + override def onSuccess(messageId: String): Unit = { + log.info(s"Published a message with topic $topicName, message id $messageId: $message") + promise.complete(Try(Id[PubsubMessage](messageId))) + } + + override def onFailure(t: Throwable): Unit = { + log.warn(s"Failed to publish a message with topic $topicName: $message", t) + promise.complete(Failure(t)) + } + } + ) + } + + promise.future + } + } + + trait PubsubSubscriber { + + def stopListening(): Unit + } + + class GooglePubsubSubscriber[T](projectId: String, subscriptionId: String, receiver: String => Future[T], log: Logger)( + implicit ex: ExecutionContext) + extends PubsubSubscriber { + + private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) + + private val messageReceiver = new MessageReceiver() { + override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { + val stringMessage = message.getData.toStringUtf8 + log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage") + receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } + } + + private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + + subscriber.startAsync() + + override def stopListening(): Unit = { + subscriber.stopAsync() + } + } +} diff --git a/src/test/scala/xyz/driver/core/FileTest.scala b/src/test/scala/xyz/driver/core/FileTest.scala index a1b0329..67878b4 100644 --- a/src/test/scala/xyz/driver/core/FileTest.scala +++ b/src/test/scala/xyz/driver/core/FileTest.scala @@ -117,7 +117,7 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar { } "Google Cloud Storage" should "upload and download files" in { - import com.google.cloud.Page + import com.google.api.gax.paging.Page import com.google.cloud.storage.{Blob, Bucket, Storage} import Bucket.BlobWriteOption import Storage.BlobListOption @@ -146,9 +146,9 @@ class FileTest extends FlatSpec with Matchers with MockitoSugar { val gcsStorage = new GcsStorage(gcsMock, testBucket, scala.concurrent.ExecutionContext.global) when(pageMock.iterateAll()).thenReturn( - Iterator[Blob]().asJava, - Iterator[Blob](blobMock).asJava, - Iterator[Blob]().asJava + Iterable[Blob]().asJava, + Iterable[Blob](blobMock).asJava, + Iterable[Blob]().asJava ) when( gcsMock.list(testBucket.value, BlobListOption.currentDirectory(), BlobListOption.prefix(testDirPath.toString))) -- cgit v1.2.3 From ebab5c30abf29d72d0b09dc519a493396e4e9e69 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 29 Jun 2017 20:36:49 -0700 Subject: Google pubsub util --- src/main/scala/xyz/driver/core/pubsub.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index 4853c35..ee84328 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -59,9 +59,11 @@ object pubsub { def stopListening(): Unit } - class GooglePubsubSubscriber[T](projectId: String, subscriptionId: String, receiver: String => Future[T], log: Logger)( - implicit ex: ExecutionContext) - extends PubsubSubscriber { + class GooglePubsubSubscriber[T](projectId: String, + subscriptionId: String, + receiver: String => Future[T], + log: Logger)(implicit ex: ExecutionContext) + extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) -- cgit v1.2.3 From b3a61dff304d6bd074a97200c4c02d48d75e4e94 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 6 Jul 2017 13:03:27 -0700 Subject: Generic messages for PubSub --- src/main/scala/xyz/driver/core/pubsub.scala | 30 ++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index ee84328..9851383 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -1,5 +1,6 @@ package xyz.driver.core +import akka.http.scaladsl.marshalling._ import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} import com.google.protobuf.ByteString @@ -11,31 +12,34 @@ import scala.util.{Failure, Try} object pubsub { - trait PubsubPublisher { + trait PubsubPublisher[Message] { - type Message type Result def publish(message: Message): Future[Result] } - class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + implicit messageMarshaller: Marshaller[Message, Array[Byte]], + executionContext: ExecutionContext + ) extends PubsubPublisher[Message] { - type Message = String - type Result = Id[PubsubMessage] + type Result = Id[PubsubMessage] private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() - override def publish(message: String): Future[Id[PubsubMessage]] = { + override def publish(message: Message): Future[Id[PubsubMessage]] = { - val data = ByteString.copyFromUtf8(message) - val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + Marshal(message).to[Array[Byte]].flatMap { messageBytes => + val data = ByteString.copyFrom(messageBytes) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() - val promise = Promise[Id[PubsubMessage]]() + val promise = Promise[Id[PubsubMessage]]() + + val messageIdFuture = publisher.publish(pubsubMessage) - make(publisher.publish(pubsubMessage)) { messageIdFeature => ApiFutures.addCallback( - messageIdFeature, + messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { log.info(s"Published a message with topic $topicName, message id $messageId: $message") @@ -48,9 +52,9 @@ object pubsub { } } ) - } - promise.future + promise.future + } } } -- cgit v1.2.3 From d967ca5b62bd790f0ff750d2f94e3f392cc1f8ad Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 6 Jul 2017 14:59:36 -0700 Subject: Generic messages for PubSub --- src/main/scala/xyz/driver/core/pubsub.scala | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index 9851383..a4de51b 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -1,6 +1,8 @@ package xyz.driver.core import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.stream.Materializer import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} import com.google.protobuf.ByteString @@ -21,7 +23,7 @@ object pubsub { class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( implicit messageMarshaller: Marshaller[Message, Array[Byte]], - executionContext: ExecutionContext + ex: ExecutionContext ) extends PubsubPublisher[Message] { type Result = Id[PubsubMessage] @@ -63,19 +65,25 @@ object pubsub { def stopListening(): Unit } - class GooglePubsubSubscriber[T](projectId: String, - subscriptionId: String, - receiver: String => Future[T], - log: Logger)(implicit ex: ExecutionContext) + class GooglePubsubSubscriber[Message, Result](projectId: String, + subscriptionId: String, + receiver: Message => Future[Result], + log: Logger)( + implicit messageMarshaller: Unmarshaller[Array[Byte], Message], + mat: Materializer, + ex: ExecutionContext) extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) private val messageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { - val stringMessage = message.getData.toStringUtf8 - log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage") - receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + val messageBytes = message.getData.toByteArray + Unmarshal(messageBytes).to[Message].flatMap { messageBody => + log.info( + s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}") + receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } } } -- cgit v1.2.3