aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/pubsub.scala
blob: 34b21409d44ea2bf39bd2cd031905ecb7c2a1810 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package xyz.driver.core

import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.Materializer
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.cloud.pubsub.v1._
import com.google.protobuf.ByteString
import com.google.pubsub.v1._
import com.typesafe.scalalogging.Logger

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Try}

@deprecated("Use the message bus implementation in xyz.driver.core.messaging.GoogleBus", "1.11.5")
object pubsub {

  trait PubsubPublisher[Message] {

    type Result

    def publish(message: Message): Future[Result]
  }

  class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)(
      implicit messageMarshaller: Marshaller[Message, String],
      ex: ExecutionContext
  ) extends PubsubPublisher[Message] {

    type Result = Id[PubsubMessage]

    private val topicName = ProjectTopicName.of(projectId, topic)

    private val publisher = {
      if (autoCreate) {
        val adminClient = TopicAdminClient.create()
        val topicExists = Try(adminClient.getTopic(topicName)).isSuccess
        if (!topicExists) {
          adminClient.createTopic(topicName)
        }
      }
      Publisher.newBuilder(topicName).build()
    }

    override def publish(message: Message): Future[Id[PubsubMessage]] = {

      Marshal(message).to[String].flatMap { messageString =>
        val data          = ByteString.copyFromUtf8(messageString)
        val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()

        val promise = Promise[Id[PubsubMessage]]()

        val messageIdFuture = publisher.publish(pubsubMessage)

        ApiFutures.addCallback(
          messageIdFuture,
          new ApiFutureCallback[String]() {
            override def onSuccess(messageId: String): Unit = {
              log.info(s"Published a message with topic $topic, message id $messageId: $messageString")
              promise.complete(Try(Id[PubsubMessage](messageId)))
            }

            override def onFailure(t: Throwable): Unit = {
              log.warn(s"Failed to publish a message with topic $topic: $message", t)
              promise.complete(Failure(t))
            }
          }
        )

        promise.future
      }
    }
  }

  class FakePubsubPublisher[Message](topicName: String, log: Logger)(
      implicit messageMarshaller: Marshaller[Message, String],
      ex: ExecutionContext)
      extends PubsubPublisher[Message] {

    type Result = Id[PubsubMessage]

    def publish(message: Message): Future[Result] =
      Marshal(message).to[String].map { messageString =>
        log.info(s"Published a message to a fake pubsub with topic $topicName: $messageString")
        generators.nextId[PubsubMessage]()
      }
  }

  trait PubsubSubscriber {

    def stopListening(): Unit
  }

  class GooglePubsubSubscriber[Message](
      projectId: String,
      subscriptionId: String,
      receiver: Message => Future[Unit],
      log: Logger,
      autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None
  )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext)
      extends PubsubSubscriber {

    private val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)

    private val messageReceiver = new MessageReceiver() {
      override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
        val messageString = message.getData.toStringUtf8
        Unmarshal(messageString).to[Message].flatMap { messageBody =>
          log.info(s"Received a message ${message.getMessageId} for subscription $subscriptionId: $messageString")
          receiver(messageBody).transform(v => { consumer.ack(); v }, t => { consumer.nack(); t })
        }
      }
    }

    private val subscriber = {
      autoCreateSettings.foreach { subscriptionSettings =>
        val adminClient        = SubscriptionAdminClient.create()
        val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess
        if (!subscriptionExists) {
          val topicName = ProjectTopicName.of(projectId, subscriptionSettings.topic)
          adminClient.createSubscription(
            subscriptionName,
            topicName,
            subscriptionSettings.pushConfig,
            subscriptionSettings.ackDeadlineSeconds)
        }
      }

      Subscriber.newBuilder(subscriptionName, messageReceiver).build()
    }

    subscriber.startAsync()

    override def stopListening(): Unit = {
      subscriber.stopAsync()
    }
  }

  object GooglePubsubSubscriber {
    final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int)
  }

  class FakePubsubSubscriber extends PubsubSubscriber {
    def stopListening(): Unit = ()
  }
}