aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzachdriver <zach@driver.xyz>2017-09-01 10:19:48 -0700
committerGitHub <noreply@github.com>2017-09-01 10:19:48 -0700
commitb822938ae7057af99cd03dba7e8b81233962fd54 (patch)
treeb1a2be78dfa486e1c93c01969ab35fe2b0976cef
parentef33866655cd392c16ba42fe1fcb10aa0cd21f42 (diff)
parentc09fc3b87e4cf99c57cb393098b7dbd729688442 (diff)
downloaddriver-core-b822938ae7057af99cd03dba7e8b81233962fd54.tar.gz
driver-core-b822938ae7057af99cd03dba7e8b81233962fd54.tar.bz2
driver-core-b822938ae7057af99cd03dba7e8b81233962fd54.zip
Merge pull request #62 from drivergroup/zsmith/autocreate-pubsubv0.16.8
Autocreate pubsub topics on publisher creation
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala56
1 files changed, 43 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
index e7e5d4a..fcd096d 100644
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -4,9 +4,9 @@ import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.Materializer
import com.google.api.core.{ApiFutureCallback, ApiFutures}
-import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber}
+import com.google.cloud.pubsub.spi.v1._
import com.google.protobuf.ByteString
-import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName}
+import com.google.pubsub.v1.{PubsubMessage, PushConfig, SubscriptionName, TopicName}
import com.typesafe.scalalogging.Logger
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -21,14 +21,25 @@ object pubsub {
def publish(message: Message): Future[Result]
}
- class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)(
+ class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)(
implicit messageMarshaller: Marshaller[Message, String],
ex: ExecutionContext
) extends PubsubPublisher[Message] {
type Result = Id[PubsubMessage]
- private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build()
+ private val topicName = TopicName.create(projectId, topic)
+
+ private val publisher = {
+ if (autoCreate) {
+ val adminClient = TopicAdminClient.create()
+ val topicExists = Try(adminClient.getTopic(topicName)).isSuccess
+ if (!topicExists) {
+ adminClient.createTopic(topicName)
+ }
+ }
+ Publisher.defaultBuilder(topicName).build()
+ }
override def publish(message: Message): Future[Id[PubsubMessage]] = {
@@ -44,12 +55,12 @@ object pubsub {
messageIdFuture,
new ApiFutureCallback[String]() {
override def onSuccess(messageId: String): Unit = {
- log.info(s"Published a message with topic $topicName, message id $messageId: $messageString")
+ log.info(s"Published a message with topic $topic, message id $messageId: $messageString")
promise.complete(Try(Id[PubsubMessage](messageId)))
}
override def onFailure(t: Throwable): Unit = {
- log.warn(s"Failed to publish a message with topic $topicName: $message", t)
+ log.warn(s"Failed to publish a message with topic $topic: $message", t)
promise.complete(Failure(t))
}
}
@@ -79,12 +90,13 @@ object pubsub {
def stopListening(): Unit
}
- class GooglePubsubSubscriber[Message](projectId: String,
- subscriptionId: String,
- receiver: Message => Future[Unit],
- log: Logger)(implicit messageMarshaller: Unmarshaller[String, Message],
- mat: Materializer,
- ex: ExecutionContext)
+ class GooglePubsubSubscriber[Message](
+ projectId: String,
+ subscriptionId: String,
+ receiver: Message => Future[Unit],
+ log: Logger,
+ autoCreateSettings: Option[GooglePubsubSubscriber.SubscriptionSettings] = None
+ )(implicit messageMarshaller: Unmarshaller[String, Message], mat: Materializer, ex: ExecutionContext)
extends PubsubSubscriber {
private val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
@@ -99,7 +111,21 @@ object pubsub {
}
}
- private val subscriber = Subscriber.defaultBuilder(subscriptionName, messageReceiver).build()
+ private val subscriber = {
+ autoCreateSettings.foreach { subscriptionSettings =>
+ val adminClient = SubscriptionAdminClient.create()
+ val subscriptionExists = Try(adminClient.getSubscription(subscriptionName)).isSuccess
+ if (!subscriptionExists) {
+ val topicName = TopicName.create(projectId, subscriptionSettings.topic)
+ adminClient.createSubscription(subscriptionName,
+ topicName,
+ subscriptionSettings.pushConfig,
+ subscriptionSettings.ackDeadlineSeconds)
+ }
+ }
+
+ Subscriber.defaultBuilder(subscriptionName, messageReceiver).build()
+ }
subscriber.startAsync()
@@ -108,6 +134,10 @@ object pubsub {
}
}
+ object GooglePubsubSubscriber {
+ final case class SubscriptionSettings(topic: String, pushConfig: PushConfig, ackDeadlineSeconds: Int)
+ }
+
class FakePubsubSubscriber extends PubsubSubscriber {
def stopListening(): Unit = ()
}