diff options
Diffstat (limited to 'jvm/src/main/scala/xyz/driver/core/pubsub.scala')
-rw-r--r-- | jvm/src/main/scala/xyz/driver/core/pubsub.scala | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/jvm/src/main/scala/xyz/driver/core/pubsub.scala b/jvm/src/main/scala/xyz/driver/core/pubsub.scala new file mode 100644 index 0000000..6d2667f --- /dev/null +++ b/jvm/src/main/scala/xyz/driver/core/pubsub.scala @@ -0,0 +1,145 @@ +package xyz.driver.core + +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.stream.Materializer +import com.google.api.core.{ApiFutureCallback, ApiFutures} +import com.google.cloud.pubsub.v1._ +import com.google.protobuf.ByteString +import com.google.pubsub.v1._ +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Try} + +object pubsub { + + trait PubsubPublisher[Message] { + + type Result + + def publish(message: Message): Future[Result] + } + + class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)( + implicit messageMarshaller: Marshaller[Message, String], + ex: ExecutionContext + ) extends PubsubPublisher[Message] { + + type Result = Id[PubsubMessage] + + private val topicName = ProjectTopicName.of(projectId, topic) + + private val publisher = { + if (autoCreate) { + val adminClient = TopicAdminClient.create() + val topicExists = Try(adminClient.getTopic(topicName)).isSuccess + if (!topicExists) { + adminClient.createTopic(topicName) + } + } + Publisher.newBuilder(topicName).build() + } + + override def publish(message: Message): Future[Id[PubsubMessage]] = { + + Marshal(message).to[String].flatMap { messageString => + val data = ByteString.copyFromUtf8(messageString) + val pubsubMessage = PubsubMessage.newBuilder().setData(data).build() + + val promise = Promise[Id[PubsubMessage]]() + + val messageIdFuture = publisher.publish(pubsubMessage) + + ApiFutures.addCallback( + messageIdFuture, + new ApiFutureCallback[String]() { + override def onSuccess(messageId: String): Unit = { + log.info(s"Published a message with topic $topic, message id $messageId: $messageString") + promise.complete(Try(Id[PubsubMessage](messageId))) + } + + override def onFailure(t: Throwable): Unit = { + log.warn(s"Failed to publish a message with topic $topic: $message", t) + promise.complete(Failure(t)) + } + } + ) + + promise.future + } + } + } + + class FakePubsubPublisher[Message](topicName: String, log: Logger)( + implicit messageMarshaller: Marshaller[Message, String], + ex: ExecutionContext) + extends PubsubPublisher[Message] { + + type Result = Id[PubsubMessage] + + def publish(message: Message): Future[Result] = + Marshal(message).to[String].map { messageString => + log.info(s"Published a message to a fake pubsub with topic $topicName: $messageString") + generators.nextId[PubsubMessage]() + } + } + + trait PubsubSubscriber { + + def stopListening(): Unit + } + + class GooglePubsubSubscriber[Message]( + projectId: String, + subscriptionId: String, + receiver: Message => Future[Unit], + log: Logger, + autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None + )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext) + extends PubsubSubscriber { + + private val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId) + + private val messageReceiver = new MessageReceiver() { + override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { + val messageString = message.getData.toStringUtf8 + Unmarshal(messageString).to[Message].flatMap { messageBody => + log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString") + receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t }) + } + } + } + + private val subscriber = { + autoCreateSettings.foreach { subscriptionSettings => + val adminClient = SubscriptionAdminClient.create() + val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess + if (!subscriptionExists) { + val topicName = ProjectTopicName.of(projectId, subscriptionSettings.topic) + adminClient.createSubscription( + subscriptionName, + topicName, + subscriptionSettings.pushConfig, + subscriptionSettings.ackDeadlineSeconds) + } + } + + Subscriber.newBuilder(subscriptionName, messageReceiver).build() + } + + subscriber.startAsync() + + override def stopListening(): Unit = { + subscriber.stopAsync() + } + } + + object GooglePubsubSubscriber { + final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int) + } + + class FakePubsubSubscriber extends PubsubSubscriber { + def stopListening(): Unit = () + } +} |