From c1b74c39568d6152f1498508f5aaaaf069aba39b Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Tue, 2 Oct 2018 16:53:12 -0700 Subject: Fixes for alicloud pubsub (#229) * Remove recover from processMessage in StreatBus * Check if queue exists before creating --- .../xyz/driver/core/messaging/AliyunBus.scala | 30 ++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'src/main/scala/xyz/driver/core/messaging/AliyunBus.scala') diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala index c23ea0f..8b7bca7 100644 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -136,18 +136,22 @@ class AliyunBus( def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { val subscriptionName = rawSubscriptionName(config, topic) - val topicName = rawTopicName(topic) - val topicRef = client.getTopicRef(topicName) - - val queueMeta = new QueueMeta - queueMeta.setQueueName(subscriptionName) - queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) - client.createQueue(queueMeta) - - val subscriptionMeta = new SubscriptionMeta - subscriptionMeta.setSubscriptionName(subscriptionName) - subscriptionMeta.setTopicName(topicName) - subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) - topicRef.subscribe(subscriptionMeta) + val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) + + if (!queueExists) { + val topicName = rawTopicName(topic) + val topicRef = client.getTopicRef(topicName) + + val queueMeta = new QueueMeta + queueMeta.setQueueName(subscriptionName) + queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) + client.createQueue(queueMeta) + + val subscriptionMeta = new SubscriptionMeta + subscriptionMeta.setSubscriptionName(subscriptionName) + subscriptionMeta.setTopicName(topicName) + subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) + topicRef.subscribe(subscriptionMeta) + } } } -- cgit v1.2.3