From 4bcb43f31dc2a65e2e0b5dcc07b44054ff7dc231 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 6 Jul 2017 19:01:56 -0700 Subject: Json to string marshallers for pubsub messages --- src/main/scala/xyz/driver/core/json.scala | 8 +++++++- src/main/scala/xyz/driver/core/pubsub.scala | 22 ++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) (limited to 'src/main') diff --git a/src/main/scala/xyz/driver/core/json.scala b/src/main/scala/xyz/driver/core/json.scala index b203c91..e5173de 100644 --- a/src/main/scala/xyz/driver/core/json.scala +++ b/src/main/scala/xyz/driver/core/json.scala @@ -4,10 +4,10 @@ import java.util.UUID import scala.reflect.runtime.universe._ import scala.util.Try - import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.server._ import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} +import akka.http.scaladsl.marshalling.{Marshaller, Marshalling} import akka.http.scaladsl.unmarshalling.Unmarshaller import spray.json._ import xyz.driver.core.auth.AuthCredentials @@ -212,4 +212,10 @@ object json { new GadtJsonFormat[T](typeField, typeValue, jsonFormat) } } + + implicit val jsValueToStringMarshaller: Marshaller[JsValue, String] = + Marshaller.strict[JsValue, String](value => Marshalling.Opaque[String](() => value.compactPrint)) + + implicit def valueToStringMarshaller[T](implicit jsonFormat: JsonFormat[T]): Marshaller[T, String] = + jsValueToStringMarshaller.compose[T](jsonFormat.write) } diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index a4de51b..e63217c 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -22,7 +22,7 @@ object pubsub { } class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( - implicit messageMarshaller: Marshaller[Message, Array[Byte]], + implicit messageMarshaller: Marshaller[Message, String], ex: ExecutionContext ) extends PubsubPublisher[Message] { @@ -32,8 +32,8 @@ object pubsub { override def publish(message: Message): Future[Id[PubsubMessage]] = { - Marshal(message).to[Array[Byte]].flatMap { messageBytes => - val data = ByteString.copyFrom(messageBytes) + Marshal(message).to[String].flatMap { messageString => + val data = ByteString.copyFromUtf8(messageString) val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() val promise = Promise[Id[PubsubMessage]]() @@ -44,7 +44,7 @@ object pubsub { messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { - log.info(s"Published a message with topic $topicName, message id $messageId: $message") + log.info(s"Published a message with topic $topicName, message id $messageId: $messageString") promise.complete(Try(Id[PubsubMessage](messageId))) } @@ -68,20 +68,18 @@ object pubsub { class GooglePubsubSubscriber[Message, Result](projectId: String, subscriptionId: String, receiver: Message => Future[Result], - log: Logger)( - implicit messageMarshaller: Unmarshaller[Array[Byte], Message], - mat: Materializer, - ex: ExecutionContext) + log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message], + mat: Materializer, + ex: ExecutionContext) extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) private val messageReceiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { - val messageBytes = message.getData.toByteArray - Unmarshal(messageBytes).to[Message].flatMap { messageBody => - log.info( - s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}") + val messageString = message.getData.toStringUtf8 + Unmarshal(messageString).to[Message].flatMap { messageBody => + log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString") receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) } } -- cgit v1.2.3