From b98ef922c02aa10a8514f2a827e17b56e7a2d74a Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Thu, 31 Aug 2017 16:24:44 -0700 Subject: Autocreate pubsub topics on publisher creation --- src/main/scala/xyz/driver/core/pubsub.scala | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'src/main/scala') diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala index e7e5d4a..d8e64e7 100644 --- a/src/main/scala/xyz/driver/core/pubsub.scala +++ b/src/main/scala/xyz/driver/core/pubsub.scala @@ -4,7 +4,7 @@ import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.Materializer import com.google.api.core.{ApiFutureCallback, ApiFutures} -import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber} +import com.google.cloud.pubsub.spi.v1._ import com.google.protobuf.ByteString import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName} import com.typesafe.scalalogging.Logger @@ -21,14 +21,25 @@ object pubsub { def publish(message: Message): Future[Result] } - class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)( + class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)( implicit messageMarshaller: Marshaller[Message, String], ex: ExecutionContext ) extends PubsubPublisher[Message] { type Result = Id[PubsubMessage] - private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build() + private val topicName = TopicName.create(projectId, topic) + + private val publisher = { + if (autoCreate) { + val adminClient = TopicAdminClient.create() + val topicExists = Try(adminClient.getTopic(topicName)).isSuccess + if (!topicExists) { + adminClient.createTopic(topicName) + } + } + Publisher.defaultBuilder(topicName).build() + } override def publish(message: Message): Future[Id[PubsubMessage]] = { @@ -44,12 +55,12 @@ object pubsub { messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { - log.info(s"Published a message with topic $topicName, message id $messageId: $messageString") + log.info(s"Published a message with topic $topic, message id $messageId: $messageString") promise.complete(Try(Id[PubsubMessage](messageId))) } override def onFailure(t: Throwable): Unit = { - log.warn(s"Failed to publish a message with topic $topicName: $message", t) + log.warn(s"Failed to publish a message with topic $topic: $message", t) promise.complete(Failure(t)) } } -- cgit v1.2.3