aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-07-06 13:03:27 -0700
committervlad <vlad@driver.xyz>2017-07-06 13:03:27 -0700
commitb3a61dff304d6bd074a97200c4c02d48d75e4e94 (patch)
tree2d77d453424d9ade94c066b843ec3f0ebdd0478d
parentebab5c30abf29d72d0b09dc519a493396e4e9e69 (diff)
downloaddriver-core-b3a61dff304d6bd074a97200c4c02d48d75e4e94.tar.gz
driver-core-b3a61dff304d6bd074a97200c4c02d48d75e4e94.tar.bz2
driver-core-b3a61dff304d6bd074a97200c4c02d48d75e4e94.zip
Generic messages for PubSub
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala30
1 files changed, 17 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
index ee84328..9851383 100644
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -1,5 +1,6 @@
package xyz.driver.core
+import akka.http.scaladsl.marshalling._
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber}
import com.google.protobuf.ByteString
@@ -11,31 +12,34 @@ import scala.util.{Failure, Try}
object pubsub {
- trait PubsubPublisher {
+ trait PubsubPublisher[Message] {
- type Message
type Result
def publish(message: Message): Future[Result]
}
- class GooglePubsubPublisher(projectId: String, topicName: String, log: Logger) extends PubsubPublisher {
+ class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)(
+ implicit messageMarshaller: Marshaller[Message, Array[Byte]],
+ executionContext: ExecutionContext
+ ) extends PubsubPublisher[Message] {
- type Message = String
- type Result = Id[PubsubMessage]
+ type Result = Id[PubsubMessage]
private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build()
- override def publish(message: String): Future[Id[PubsubMessage]] = {
+ override def publish(message: Message): Future[Id[PubsubMessage]] = {
- val data = ByteString.copyFromUtf8(message)
- val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()
+ Marshal(message).to[Array[Byte]].flatMap { messageBytes =>
+ val data = ByteString.copyFrom(messageBytes)
+ val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()
- val promise = Promise[Id[PubsubMessage]]()
+ val promise = Promise[Id[PubsubMessage]]()
+
+ val messageIdFuture = publisher.publish(pubsubMessage)
- make(publisher.publish(pubsubMessage)) { messageIdFeature =>
ApiFutures.addCallback(
- messageIdFeature,
+ messageIdFuture,
new ApiFutureCallback[String]() {
override def onSuccess(messageId: String): Unit = {
log.info(s"Published a message with topic $topicName, message id $messageId: $message")
@@ -48,9 +52,9 @@ object pubsub {
}
}
)
- }
- promise.future
+ promise.future
+ }
}
}