From d967ca5b62bd790f0ff750d2f94e3f392cc1f8ad Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 6 Jul 2017 14:59:36 -0700 Subject: Generic messages for PubSub --- src/main/scala/xyz/driver/core/pubsub.scala | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) (limited to 'src/main/scala/xyz') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index 9851383..a4de51b 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -1,6 +1,8 @@ package xyz.driver.core import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.stream.Materializer import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} import com.google.protobuf.ByteString @@ -21,7 +23,7 @@ object pubsub { class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( implicit messageMarshaller: Marshaller[Message, Array[Byte]], - executionContext: ExecutionContext + ex: ExecutionContext ) extends PubsubPublisher[Message] { type Result = Id[PubsubMessage] @@ -63,19 +65,25 @@ object pubsub { def stopListening(): Unit } - class GooglePubsubSubscriber[T](projectId: String, - subscriptionId: String, - receiver: String => Future[T], - log: Logger)(implicit ex: ExecutionContext) + class GooglePubsubSubscriber[Message, Result](projectId: String, + subscriptionId: String, + receiver: Message => Future[Result], + log: Logger)( + implicit messageMarshaller: Unmarshaller[Array[Byte], Message], + mat: Materializer, + ex: ExecutionContext) extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) private val messageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { - val stringMessage = message.getData.toStringUtf8 - log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage") - receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + val messageBytes = message.getData.toByteArray + Unmarshal(messageBytes).to[Message].flatMap { messageBody => + log.info( + s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}") + receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } } } -- cgit v1.2.3