diff options
author | zachdriver <zach@driver.xyz> | 2017-09-01 10:19:48 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-01 10:19:48 -0700 |
commit | b822938ae7057af99cd03dba7e8b81233962fd54 (patch) | |
tree | b1a2be78dfa486e1c93c01969ab35fe2b0976cef /src/main/scala/xyz | |
parent | ef33866655cd392c16ba42fe1fcb10aa0cd21f42 (diff) | |
parent | c09fc3b87e4cf99c57cb393098b7dbd729688442 (diff) | |
download | driver-core-b822938ae7057af99cd03dba7e8b81233962fd54.tar.gz driver-core-b822938ae7057af99cd03dba7e8b81233962fd54.tar.bz2 driver-core-b822938ae7057af99cd03dba7e8b81233962fd54.zip |
Merge pull request #62 from drivergroup/zsmith/autocreate-pubsubv0.16.8
Autocreate pubsub topics on publisher creation
Diffstat (limited to 'src/main/scala/xyz')
-rw-r--r-- | src/main/scala/xyz/driver/core/pubsub.scala | 56 |
1 files changed, 43 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index e7e5d4a..fcd096d 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -4,9 +4,9 @@ import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.Materializer import com.google.api.core.{ApiFutureCallback, ApiFutures} -import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} +import com.google.cloud.pubsub.spi.v1._ import com.google.protobuf.ByteString -import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} +import com.google.pubsub.v1.{PubsubMessage, PushConfig, SubscriptionName, TopicName} import com.typesafe.scalalogging.Logger import scala.concurrent.{ExecutionContext, Future, Promise} @@ -21,14 +21,25 @@ object pubsub { def publish(message: Message): Future[Result] } - class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)( implicit messageMarshaller: Marshaller[Message, String], ex: ExecutionContext ) extends PubsubPublisher[Message] { type Result = Id[PubsubMessage] - private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() + private val topicName = TopicName.create(projectId, topic) + + private val publisher = { + if (autoCreate) { + val adminClient = TopicAdminClient.create() + val topicExists = Try(adminClient.getTopic(topicName)).isSuccess + if (!topicExists) { + adminClient.createTopic(topicName) + } + } + Publisher.defaultBuilder(topicName).build() + } override def publish(message: Message): Future[Id[PubsubMessage]] = { @@ -44,12 +55,12 @@ object pubsub { messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { - log.info(s"Published a message with topic $topicName, message id $messageId: $messageString") + log.info(s"Published a message with topic $topic, message id $messageId: $messageString") promise.complete(Try(Id[PubsubMessage](messageId))) } override def onFailure(t: Throwable): Unit = { - log.warn(s"Failed to publish a message with topic $topicName: $message", t) + log.warn(s"Failed to publish a message with topic $topic: $message", t) promise.complete(Failure(t)) } } @@ -79,12 +90,13 @@ object pubsub { def stopListening(): Unit } - class GooglePubsubSubscriber[Message](projectId: String, - subscriptionId: String, - receiver: Message => Future[Unit], - log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message], - mat: Materializer, - ex: ExecutionContext) + class GooglePubsubSubscriber[Message]( + projectId: String, + subscriptionId: String, + receiver: Message => Future[Unit], + log: Logger, + autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None + )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext) extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) @@ -99,7 +111,21 @@ object pubsub { } } - private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + private val subscriber = { + autoCreateSettings.foreach { subscriptionSettings => + val adminClient = SubscriptionAdminClient.create() + val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess + if (!subscriptionExists) { + val topicName = TopicName.create(projectId, subscriptionSettings.topic) + adminClient.createSubscription(subscriptionName, + topicName, + subscriptionSettings.pushConfig, + subscriptionSettings.ackDeadlineSeconds) + } + } + + Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + } subscriber.startAsync() @@ -108,6 +134,10 @@ object pubsub { } } + object GooglePubsubSubscriber { + final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int) + } + class FakePubsubSubscriber extends PubsubSubscriber { def stopListening(): Unit = () } |