aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-07-06 19:01:56 -0700
committervlad <vlad@driver.xyz>2017-07-06 19:01:56 -0700
commit4bcb43f31dc2a65e2e0b5dcc07b44054ff7dc231 (patch)
tree2dd5bb1d52b8f277ef83773c392026ee7886d670
parentc83566d7e22be806a6fc490b670eb5024a0b882d (diff)
downloaddriver-core-4bcb43f31dc2a65e2e0b5dcc07b44054ff7dc231.tar.gz
driver-core-4bcb43f31dc2a65e2e0b5dcc07b44054ff7dc231.tar.bz2
driver-core-4bcb43f31dc2a65e2e0b5dcc07b44054ff7dc231.zip
Json to string marshallers for pubsub messagesv0.13.17
-rw-r--r--src/main/scala/xyz/driver/core/json.scala8
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala22
2 files changed, 17 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/json.scala b/src/main/scala/xyz/driver/core/json.scala
index b203c91..e5173de 100644
--- a/src/main/scala/xyz/driver/core/json.scala
+++ b/src/main/scala/xyz/driver/core/json.scala
@@ -4,10 +4,10 @@ import java.util.UUID
import scala.reflect.runtime.universe._
import scala.util.Try
-
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched}
+import akka.http.scaladsl.marshalling.{Marshaller, Marshalling}
import akka.http.scaladsl.unmarshalling.Unmarshaller
import spray.json._
import xyz.driver.core.auth.AuthCredentials
@@ -212,4 +212,10 @@ object json {
new GadtJsonFormat[T](typeField, typeValue, jsonFormat)
}
}
+
+ implicit val jsValueToStringMarshaller: Marshaller[JsValue, String] =
+ Marshaller.strict[JsValue, String](value => Marshalling.Opaque[String](() => value.compactPrint))
+
+ implicit def valueToStringMarshaller[T](implicit jsonFormat: JsonFormat[T]): Marshaller[T, String] =
+ jsValueToStringMarshaller.compose[T](jsonFormat.write)
}
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
index a4de51b..e63217c 100644
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -22,7 +22,7 @@ object pubsub {
}
class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)(
- implicit messageMarshaller: Marshaller[Message, Array[Byte]],
+ implicit messageMarshaller: Marshaller[Message, String],
ex: ExecutionContext
) extends PubsubPublisher[Message] {
@@ -32,8 +32,8 @@ object pubsub {
override def publish(message: Message): Future[Id[PubsubMessage]] = {
- Marshal(message).to[Array[Byte]].flatMap { messageBytes =>
- val data = ByteString.copyFrom(messageBytes)
+ Marshal(message).to[String].flatMap { messageString =>
+ val data = ByteString.copyFromUtf8(messageString)
val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()
val promise = Promise[Id[PubsubMessage]]()
@@ -44,7 +44,7 @@ object pubsub {
messageIdFuture,
new ApiFutureCallback[String]() {
override def onSuccess(messageId: String): Unit = {
- log.info(s"Published a message with topic $topicName, message id $messageId: $message")
+ log.info(s"Published a message with topic $topicName, message id $messageId: $messageString")
promise.complete(Try(Id[PubsubMessage](messageId)))
}
@@ -68,20 +68,18 @@ object pubsub {
class GooglePubsubSubscriber[Message, Result](projectId: String,
subscriptionId: String,
receiver: Message => Future[Result],
- log: Logger)(
- implicit messageMarshaller: Unmarshaller[Array[Byte], Message],
- mat: Materializer,
- ex: ExecutionContext)
+ log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message],
+ mat: Materializer,
+ ex: ExecutionContext)
extends PubsubSubscriber {
private val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
private val messageReceiver = new MessageReceiver() {
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
- val messageBytes = message.getData.toByteArray
- Unmarshal(messageBytes).to[Message].flatMap { messageBody =>
- log.info(
- s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}")
+ val messageString = message.getData.toStringUtf8
+ Unmarshal(messageString).to[Message].flatMap { messageBody =>
+ log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString")
receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
}
}