From b3a61dff304d6bd074a97200c4c02d48d75e4e94 Mon Sep 17 00:00:00 2001 From: vlad Date: Thu, 6 Jul 2017 13:03:27 -0700 Subject: Generic messages for PubSub --- src/main/scala/xyz/driver/core/pubsub.scala | 30 ++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'src/main/scala') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index ee84328..9851383 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -1,5 +1,6 @@ package xyz.driver.core +import akka.http.scaladsl.marshalling._ import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} import com.google.protobuf.ByteString @@ -11,31 +12,34 @@ import scala.util.{Failure, Try} object pubsub { - trait PubsubPublisher { + trait PubsubPublisher[Message] { - type Message type Result def publish(message: Message): Future[Result] } - class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + implicit messageMarshaller: Marshaller[Message, Array[Byte]], + executionContext: ExecutionContext + ) extends PubsubPublisher[Message] { - type Message = String - type Result = Id[PubsubMessage] + type Result = Id[PubsubMessage] private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() - override def publish(message: String): Future[Id[PubsubMessage]] = { + override def publish(message: Message): Future[Id[PubsubMessage]] = { - val data = ByteString.copyFromUtf8(message) - val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + Marshal(message).to[Array[Byte]].flatMap { messageBytes => + val data = ByteString.copyFrom(messageBytes) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() - val promise = Promise[Id[PubsubMessage]]() + val promise = Promise[Id[PubsubMessage]]() + + val messageIdFuture = publisher.publish(pubsubMessage) - make(publisher.publish(pubsubMessage)) { messageIdFeature => ApiFutures.addCallback( - messageIdFeature, + messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { log.info(s"Published a message with topic $topicName, message id $messageId: $message") @@ -48,9 +52,9 @@ object pubsub { } } ) - } - promise.future + promise.future + } } } -- cgit v1.2.3