aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2017-08-31 18:48:16 -0700
committerZach Smith <zach@driver.xyz>2017-08-31 18:48:16 -0700
commitc09fc3b87e4cf99c57cb393098b7dbd729688442 (patch)
treeb1a2be78dfa486e1c93c01969ab35fe2b0976cef
parentb98ef922c02aa10a8514f2a827e17b56e7a2d74a (diff)
downloaddriver-core-c09fc3b87e4cf99c57cb393098b7dbd729688442.tar.gz
driver-core-c09fc3b87e4cf99c57cb393098b7dbd729688442.tar.bz2
driver-core-c09fc3b87e4cf99c57cb393098b7dbd729688442.zip
Proposed solution for autocreating subscriptions
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala35
1 files changed, 27 insertions, 8 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
index d8e64e7..fcd096d 100644
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -6,7 +6,7 @@ import akka.stream.Materializer
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.cloud.pubsub.spi.v1._
import com.google.protobuf.ByteString
-import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName}
+import com.google.pubsub.v1.{PubsubMessage, PushConfig, SubscriptionName, TopicName}
import com.typesafe.scalalogging.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -90,12 +90,13 @@ object pubsub {
def stopListening(): Unit
}
- class GooglePubsubSubscriber[Message](projectId: String,
- subscriptionId: String,
- receiver: Message => Future[Unit],
- log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message],
- mat: Materializer,
- ex: ExecutionContext)
+ class GooglePubsubSubscriber[Message](
+ projectId: String,
+ subscriptionId: String,
+ receiver: Message => Future[Unit],
+ log: Logger,
+ autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None
+ )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext)
extends PubsubSubscriber {
private val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
@@ -110,7 +111,21 @@ object pubsub {
}
}
- private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build()
+ private val subscriber = {
+ autoCreateSettings.foreach { subscriptionSettings =>
+ val adminClient = SubscriptionAdminClient.create()
+ val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess
+ if (!subscriptionExists) {
+ val topicName = TopicName.create(projectId, subscriptionSettings.topic)
+ adminClient.createSubscription(subscriptionName,
+ topicName,
+ subscriptionSettings.pushConfig,
+ subscriptionSettings.ackDeadlineSeconds)
+ }
+ }
+
+ Subscriber.defaultBuilder(subscriptionName, messageReceiver).build()
+ }
subscriber.startAsync()
@@ -119,6 +134,10 @@ object pubsub {
}
}
+ object GooglePubsubSubscriber {
+ final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int)
+ }
+
class FakePubsubSubscriber extends PubsubSubscriber {
def stopListening(): Unit = ()
}