aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2017-08-31 16:24:44 -0700
committerZach Smith <zach@driver.xyz>2017-08-31 16:24:44 -0700
commitb98ef922c02aa10a8514f2a827e17b56e7a2d74a (patch)
treeda54124b6f76f51587763a3e812a3fce0a8d6390
parentef33866655cd392c16ba42fe1fcb10aa0cd21f42 (diff)
downloaddriver-core-b98ef922c02aa10a8514f2a827e17b56e7a2d74a.tar.gz
driver-core-b98ef922c02aa10a8514f2a827e17b56e7a2d74a.tar.bz2
driver-core-b98ef922c02aa10a8514f2a827e17b56e7a2d74a.zip
Autocreate pubsub topics on publisher creation
-rw-r--r--src/main/scala/xyz/driver/core/pubsub.scala21
1 files changed, 16 insertions, 5 deletions
diff --git a/src/main/scala/xyz/driver/core/pubsub.scala b/src/main/scala/xyz/driver/core/pubsub.scala
index e7e5d4a..d8e64e7 100644
--- a/src/main/scala/xyz/driver/core/pubsub.scala
+++ b/src/main/scala/xyz/driver/core/pubsub.scala
@@ -4,7 +4,7 @@ import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.Materializer
import com.google.api.core.{ApiFutureCallback, ApiFutures}
-import com.google.cloud.pubsub.spi.v1.{AckReplyConsumer, MessageReceiver, Publisher, Subscriber}
+import com.google.cloud.pubsub.spi.v1._
import com.google.protobuf.ByteString
import com.google.pubsub.v1.{PubsubMessage, SubscriptionName, TopicName}
import com.typesafe.scalalogging.Logger
@@ -21,14 +21,25 @@ object pubsub {
def publish(message: Message): Future[Result]
}
- class GooglePubsubPublisher[Message](projectId: String, topicName: String, log: Logger)(
+ class GooglePubsubPublisher[Message](projectId: String, topic: String, log: Logger, autoCreate: Boolean = true)(
implicit messageMarshaller: Marshaller[Message, String],
ex: ExecutionContext
) extends PubsubPublisher[Message] {
type Result = Id[PubsubMessage]
- private val publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build()
+ private val topicName = TopicName.create(projectId, topic)
+
+ private val publisher = {
+ if (autoCreate) {
+ val adminClient = TopicAdminClient.create()
+ val topicExists = Try(adminClient.getTopic(topicName)).isSuccess
+ if (!topicExists) {
+ adminClient.createTopic(topicName)
+ }
+ }
+ Publisher.defaultBuilder(topicName).build()
+ }
override def publish(message: Message): Future[Id[PubsubMessage]] = {
@@ -44,12 +55,12 @@ object pubsub {
messageIdFuture,
new ApiFutureCallback[String]() {
override def onSuccess(messageId: String): Unit = {
- log.info(s"Published a message with topic $topicName, message id $messageId: $messageString")
+ log.info(s"Published a message with topic $topic, message id $messageId: $messageString")
promise.complete(Try(Id[PubsubMessage](messageId)))
}
override def onFailure(t: Throwable): Unit = {
- log.warn(s"Failed to publish a message with topic $topicName: $message", t)
+ log.warn(s"Failed to publish a message with topic $topic: $message", t)
promise.complete(Failure(t))
}
}