From c09fc3b87e4cf99c57cb393098b7dbd729688442 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Thu, 31 Aug 2017 18:48:16 -0700 Subject: Proposed solution for autocreating subscriptions --- src/main/scala/xyz/driver/core/pubsub.scala | 35 ++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index d8e64e7..fcd096d 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -6,7 +6,7 @@ import akka.stream.Materializer import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.cloud.pubsub.spi.v1._ import com.google.protobuf.ByteString -import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} +import com.google.pubsub.v1.{PubsubMessage, PushConfig, SubscriptionName, TopicName} import com.typesafe.scalalogging.Logger import scala.concurrent.{ExecutionContext, Future, Promise} @@ -90,12 +90,13 @@ object pubsub { def stopListening(): Unit } - class GooglePubsubSubscriber[Message](projectId: String, - subscriptionId: String, - receiver: Message => Future[Unit], - log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message], - mat: Materializer, - ex: ExecutionContext) + class GooglePubsubSubscriber[Message]( + projectId: String, + subscriptionId: String, + receiver: Message => Future[Unit], + log: Logger, + autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None + )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext) extends PubsubSubscriber { private val subscriptionName = SubscriptionName.create(projectId, subscriptionId) @@ -110,7 +111,21 @@ object pubsub { } } - private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + private val subscriber = { + autoCreateSettings.foreach { subscriptionSettings => + val adminClient = SubscriptionAdminClient.create() + val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess + if (!subscriptionExists) { + val topicName = TopicName.create(projectId, subscriptionSettings.topic) + adminClient.createSubscription(subscriptionName, + topicName, + subscriptionSettings.pushConfig, + subscriptionSettings.ackDeadlineSeconds) + } + } + + Subscriber.defaultBuilder(subscriptionName, messageReceiver).build() + } subscriber.startAsync() @@ -119,6 +134,10 @@ object pubsub { } } + object GooglePubsubSubscriber { + final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int) + } + class FakePubsubSubscriber extends PubsubSubscriber { def stopListening(): Unit = () } -- cgit v1.2.3