aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/core/pubsub.scala
blob: 34b21409d44ea2bf39bd2cd031905ecb7c2a1810 (plain) (tree)
1
2
3
4
5
6
7
8
9

                       
                                       

                                                                 
                                                          
                                   
                                     
                             




                                                           
                                                                                                  

               
                                  
 




                                                 
                                                                                                                  

                                                              
                                      
 
                                   
 
                                                                 








                                                                        
                                             
     
 
                                                                         
 

                                                                  
                                                                            
 


                                                              
 
                               
                          

                                                               
                                                                                                       



                                                                 
                                                                                     



                                          
 

                      


     
                                                                     

                                                              










                                                                                               




                             
                                        




                                                                                    
                                                                                                       
                                
 
                                                                                        


                                                                                               


                                                                                                                  

                                                                                                  


       




                                                                                             
                                                                                    




                                                    


         
                                                                      
     






                                          
 



                                                                                                         


                                                       
 
package xyz.driver.core

import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.Materializer
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.cloud.pubsub.v1._
import com.google.protobuf.ByteString
import com.google.pubsub.v1._
import com.typesafe.scalalogging.Logger

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Try}

@deprecated("Use the message bus implementation in xyz.driver.core.messaging.GoogleBus", "1.11.5")
object pubsub {

  trait PubsubPublisher[Message] {

    type Result

    def publish(message: Message): Future[Result]
  }

  class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)(
      implicit messageMarshaller: Marshaller[Message, String],
      ex: ExecutionContext
  ) extends PubsubPublisher[Message] {

    type Result = Id[PubsubMessage]

    private val topicName = ProjectTopicName.of(projectId, topic)

    private val publisher = {
      if (autoCreate) {
        val adminClient = TopicAdminClient.create()
        val topicExists = Try(adminClient.getTopic(topicName)).isSuccess
        if (!topicExists) {
          adminClient.createTopic(topicName)
        }
      }
      Publisher.newBuilder(topicName).build()
    }

    override def publish(message: Message): Future[Id[PubsubMessage]] = {

      Marshal(message).to[String].flatMap { messageString =>
        val data          = ByteString.copyFromUtf8(messageString)
        val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()

        val promise = Promise[Id[PubsubMessage]]()

        val messageIdFuture = publisher.publish(pubsubMessage)

        ApiFutures.addCallback(
          messageIdFuture,
          new ApiFutureCallback[String]() {
            override def onSuccess(messageId: String): Unit = {
              log.info(s"Published a message with topic $topic, message id $messageId: $messageString")
              promise.complete(Try(Id[PubsubMessage](messageId)))
            }

            override def onFailure(t: Throwable): Unit = {
              log.warn(s"Failed to publish a message with topic $topic: $message", t)
              promise.complete(Failure(t))
            }
          }
        )

        promise.future
      }
    }
  }

  class FakePubsubPublisher[Message](topicName: String, log: Logger)(
      implicit messageMarshaller: Marshaller[Message, String],
      ex: ExecutionContext)
      extends PubsubPublisher[Message] {

    type Result = Id[PubsubMessage]

    def publish(message: Message): Future[Result] =
      Marshal(message).to[String].map { messageString =>
        log.info(s"Published a message to a fake pubsub with topic $topicName: $messageString")
        generators.nextId[PubsubMessage]()
      }
  }

  trait PubsubSubscriber {

    def stopListening(): Unit
  }

  class GooglePubsubSubscriber[Message](
      projectId: String,
      subscriptionId: String,
      receiver: Message => Future[Unit],
      log: Logger,
      autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None
  )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext)
      extends PubsubSubscriber {

    private val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)

    private val messageReceiver = new MessageReceiver() {
      override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
        val messageString = message.getData.toStringUtf8
        Unmarshal(messageString).to[Message].flatMap { messageBody =>
          log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString")
          receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
        }
      }
    }

    private val subscriber = {
      autoCreateSettings.foreach { subscriptionSettings =>
        val adminClient        = SubscriptionAdminClient.create()
        val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess
        if (!subscriptionExists) {
          val topicName = ProjectTopicName.of(projectId, subscriptionSettings.topic)
          adminClient.createSubscription(
            subscriptionName,
            topicName,
            subscriptionSettings.pushConfig,
            subscriptionSettings.ackDeadlineSeconds)
        }
      }

      Subscriber.newBuilder(subscriptionName, messageReceiver).build()
    }

    subscriber.startAsync()

    override def stopListening(): Unit = {
      subscriber.stopAsync()
    }
  }

  object GooglePubsubSubscriber {
    final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int)
  }

  class FakePubsubSubscriber extends PubsubSubscriber {
    def stopListening(): Unit = ()
  }
}