diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/scala/xyz/driver/core/pubsub.scala | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index ee84328..9851383 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -1,5 +1,6 @@ package xyz.driver.core +import akka.http.scaladsl.marshalling._ import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} import com.google.protobuf.ByteString @@ -11,31 +12,34 @@ import scala.util.{Failure, Try} object pubsub { - trait PubsubPublisher { + trait PubsubPublisher[Message] { - type Message type Result def publish(message: Message): Future[Result] } - class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + implicit messageMarshaller: Marshaller[Message, Array[Byte]], + executionContext: ExecutionContext + ) extends PubsubPublisher[Message] { - type Message = String - type Result = Id[PubsubMessage] + type Result = Id[PubsubMessage] private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() - override def publish(message: String): Future[Id[PubsubMessage]] = { + override def publish(message: Message): Future[Id[PubsubMessage]] = { - val data = ByteString.copyFromUtf8(message) - val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + Marshal(message).to[Array[Byte]].flatMap { messageBytes => + val data = ByteString.copyFrom(messageBytes) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() - val promise = Promise[Id[PubsubMessage]]() + val promise = Promise[Id[PubsubMessage]]() + + val messageIdFuture = publisher.publish(pubsubMessage) - make(publisher.publish(pubsubMessage)) { messageIdFeature => ApiFutures.addCallback( - messageIdFeature, + messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { log.info(s"Published a message with topic $topicName, message id $messageId: $message") @@ -48,9 +52,9 @@ object pubsub { } } ) - } - promise.future + promise.future + } } } |