diff options
author | vlad <vlad@driver.xyz> | 2017-07-06 13:03:27 -0700 |
---|---|---|
committer | vlad <vlad@driver.xyz> | 2017-07-06 13:03:27 -0700 |
commit | b3a61dff304d6bd074a97200c4c02d48d75e4e94 (patch) | |
tree | 2d77d453424d9ade94c066b843ec3f0ebdd0478d /src/main/scala/xyz/driver/core/pubsub.scala | |
parent | ebab5c30abf29d72d0b09dc519a493396e4e9e69 (diff) | |
download | driver-core-b3a61dff304d6bd074a97200c4c02d48d75e4e94.tar.gz driver-core-b3a61dff304d6bd074a97200c4c02d48d75e4e94.tar.bz2 driver-core-b3a61dff304d6bd074a97200c4c02d48d75e4e94.zip |
Generic messages for PubSub
Diffstat (limited to 'src/main/scala/xyz/driver/core/pubsub.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/pubsub.scala | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index ee84328..9851383 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -1,5 +1,6 @@ package xyz.driver.core +import akka.http.scaladsl.marshalling._ import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} import com.google.protobuf.ByteString @@ -11,31 +12,34 @@ import scala.util.{Failure, Try} object pubsub { - trait PubsubPublisher { + trait PubsubPublisher[Message] { - type Message type Result def publish(message: Message): Future[Result] } - class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher { + class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + implicit messageMarshaller: Marshaller[Message, Array[Byte]], + executionContext: ExecutionContext + ) extends PubsubPublisher[Message] { - type Message = String - type Result = Id[PubsubMessage] + type Result = Id[PubsubMessage] private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() - override def publish(message: String): Future[Id[PubsubMessage]] = { + override def publish(message: Message): Future[Id[PubsubMessage]] = { - val data = ByteString.copyFromUtf8(message) - val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + Marshal(message).to[Array[Byte]].flatMap { messageBytes => + val data = ByteString.copyFrom(messageBytes) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() - val promise = Promise[Id[PubsubMessage]]() + val promise = Promise[Id[PubsubMessage]]() + + val messageIdFuture = publisher.publish(pubsubMessage) - make(publisher.publish(pubsubMessage)) { messageIdFeature => ApiFutures.addCallback( - messageIdFeature, + messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { log.info(s"Published a message with topic $topicName, message id $messageId: $message") @@ -48,9 +52,9 @@ object pubsub { } } ) - } - promise.future + promise.future + } } } |