From 4bcb43f31dc2a65e2e0b5dcc07b44054ff7dc231 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 6 Jul 2017 19:01:56 -0700 Subject: Json to string marshallers for pubsub messages --- src/main/scala/xyz/driver/core/pubsub.scala | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) (limited to 'src/main/scala/xyz/driver/core/pubsub.scala') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index a4de51b..e63217c 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -22,7 +22,7 @@ object pubsub { } class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( - implicit messageMarshaller: Marshaller[Message, Array[Byte]], + implicit messageMarshaller: Marshaller[Message, String], ex: ExecutionContext ) extends PubsubPublisher[Message] { @@ -32,8 +32,8 @@ object pubsub { override def publish(message: Message): Future[Id[PubsubMessage]] = { - Marshal(message).to[Array[Byte]].flatMap { messageBytes => - val data = ByteString.copyFrom(messageBytes) + Marshal(message).to[String].flatMap { messageString => + val data = ByteString.copyFromUtf8(messageString) val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() val promise = Promise[Id[PubsubMessage]]() @@ -44,7 +44,7 @@ object pubsub { messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { - log.info(s"Published a message with topic $topicName, message id $messageId: $message") + log.info(s"Published a message with topic $topicName, message id $messageId: $messageString") promise.complete(Try(Id[PubsubMessage](messageId))) } @@ -68,20 +68,18 @@ object pubsub { class GooglePubsubSubscriber[Message, Result](projectId: String, subscriptionId: String, receiver: Message => Future[Result], - log: Logger)( - implicit messageMarshaller: Unmarshaller[Array[Byte], Message], - mat: Materializer, - ex: ExecutionContext) + log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message], + mat: Materializer, + ex: ExecutionContext) extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) private val messageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { - val messageBytes = message.getData.toByteArray - Unmarshal(messageBytes).to[Message].flatMap { messageBody => - log.info( - s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}") + val messageString = message.getData.toStringUtf8 + Unmarshal(messageString).to[Message].flatMap { messageBody => + log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString") receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) } } -- cgit v1.2.3