aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/pubsub.scala
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-07-06 14:59:36 -0700
committervlad <vlad@driver.xyz>2017-07-06 14:59:36 -0700
commitd967ca5b62bd790f0ff750d2f94e3f392cc1f8ad (patch)
tree13c248d28ea2a1da3d0dbfdac485717969a4e492 /src/main/scala/xyz/driver/core/pubsub.scala
parentb3a61dff304d6bd074a97200c4c02d48d75e4e94 (diff)
downloaddriver-core-d967ca5b62bd790f0ff750d2f94e3f392cc1f8ad.tar.gz
driver-core-d967ca5b62bd790f0ff750d2f94e3f392cc1f8ad.tar.bz2
driver-core-d967ca5b62bd790f0ff750d2f94e3f392cc1f8ad.zip
Generic messages for PubSub
Diffstat (limited to 'src/main/scala/xyz/driver/core/pubsub.scala')
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala24
1 files changed, 16 insertions, 8 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
index 9851383..a4de51b 100644
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -1,6 +1,8 @@
package xyz.driver.core
import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
+import akka.stream.Materializer
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber}
import com.google.protobuf.ByteString
@@ -21,7 +23,7 @@ object pubsub {
class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)(
implicit messageMarshaller: Marshaller[Message, Array[Byte]],
- executionContext: ExecutionContext
+ ex: ExecutionContext
) extends PubsubPublisher[Message] {
type Result = Id[PubsubMessage]
@@ -63,19 +65,25 @@ object pubsub {
def stopListening(): Unit
}
- class GooglePubsubSubscriber[T](projectId: String,
- subscriptionId: String,
- receiver: String => Future[T],
- log: Logger)(implicit ex: ExecutionContext)
+ class GooglePubsubSubscriber[Message, Result](projectId: String,
+ subscriptionId: String,
+ receiver: Message => Future[Result],
+ log: Logger)(
+ implicit messageMarshaller: Unmarshaller[Array[Byte], Message],
+ mat: Materializer,
+ ex: ExecutionContext)
extends PubsubSubscriber {
private val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
private val messageReceiver = new MessageReceiver() {
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
- val stringMessage = message.getData.toStringUtf8
- log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $stringMessage")
- receiver(stringMessage).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
+ val messageBytes = message.getData.toByteArray
+ Unmarshal(messageBytes).to[Message].flatMap { messageBody =>
+ log.info(
+ s"Received a message ${message.getMessageId} for subscription $subscriptionId: ${new String(messageBytes)}")
+ receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
+ }
}
}