aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/pubsub.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/pubsub.scala')
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala145
1 files changed, 0 insertions, 145 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
deleted file mode 100644
index 6d2667f..0000000
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-package xyz.driver.core
-
-import akka.http.scaladsl.marshalling._
-import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
-import akka.stream.Materializer
-import com.google.api.core.{ApiFutureCallback, ApiFutures}
-import com.google.cloud.pubsub.v1._
-import com.google.protobuf.ByteString
-import com.google.pubsub.v1._
-import com.typesafe.scalalogging.Logger
-
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Try}
-
-object pubsub {
-
- trait PubsubPublisher[Message] {
-
- type Result
-
- def publish(message: Message): Future[Result]
- }
-
- class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)(
- implicit messageMarshaller: Marshaller[Message, String],
- ex: ExecutionContext
- ) extends PubsubPublisher[Message] {
-
- type Result = Id[PubsubMessage]
-
- private val topicName = ProjectTopicName.of(projectId, topic)
-
- private val publisher = {
- if (autoCreate) {
- val adminClient = TopicAdminClient.create()
- val topicExists = Try(adminClient.getTopic(topicName)).isSuccess
- if (!topicExists) {
- adminClient.createTopic(topicName)
- }
- }
- Publisher.newBuilder(topicName).build()
- }
-
- override def publish(message: Message): Future[Id[PubsubMessage]] = {
-
- Marshal(message).to[String].flatMap { messageString =>
- val data = ByteString.copyFromUtf8(messageString)
- val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()
-
- val promise = Promise[Id[PubsubMessage]]()
-
- val messageIdFuture = publisher.publish(pubsubMessage)
-
- ApiFutures.addCallback(
- messageIdFuture,
- new ApiFutureCallback[String]() {
- override def onSuccess(messageId: String): Unit = {
- log.info(s"Published a message with topic $topic, message id $messageId: $messageString")
- promise.complete(Try(Id[PubsubMessage](messageId)))
- }
-
- override def onFailure(t: Throwable): Unit = {
- log.warn(s"Failed to publish a message with topic $topic: $message", t)
- promise.complete(Failure(t))
- }
- }
- )
-
- promise.future
- }
- }
- }
-
- class FakePubsubPublisher[Message](topicName: String, log: Logger)(
- implicit messageMarshaller: Marshaller[Message, String],
- ex: ExecutionContext)
- extends PubsubPublisher[Message] {
-
- type Result = Id[PubsubMessage]
-
- def publish(message: Message): Future[Result] =
- Marshal(message).to[String].map { messageString =>
- log.info(s"Published a message to a fake pubsub with topic $topicName: $messageString")
- generators.nextId[PubsubMessage]()
- }
- }
-
- trait PubsubSubscriber {
-
- def stopListening(): Unit
- }
-
- class GooglePubsubSubscriber[Message](
- projectId: String,
- subscriptionId: String,
- receiver: Message => Future[Unit],
- log: Logger,
- autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None
- )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext)
- extends PubsubSubscriber {
-
- private val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
-
- private val messageReceiver = new MessageReceiver() {
- override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
- val messageString = message.getData.toStringUtf8
- Unmarshal(messageString).to[Message].flatMap { messageBody =>
- log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString")
- receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
- }
- }
- }
-
- private val subscriber = {
- autoCreateSettings.foreach { subscriptionSettings =>
- val adminClient = SubscriptionAdminClient.create()
- val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess
- if (!subscriptionExists) {
- val topicName = ProjectTopicName.of(projectId, subscriptionSettings.topic)
- adminClient.createSubscription(
- subscriptionName,
- topicName,
- subscriptionSettings.pushConfig,
- subscriptionSettings.ackDeadlineSeconds)
- }
- }
-
- Subscriber.newBuilder(subscriptionName, messageReceiver).build()
- }
-
- subscriber.startAsync()
-
- override def stopListening(): Unit = {
- subscriber.stopAsync()
- }
- }
-
- object GooglePubsubSubscriber {
- final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int)
- }
-
- class FakePubsubSubscriber extends PubsubSubscriber {
- def stopListening(): Unit = ()
- }
-}