aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/AliyunBus.scala')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/AliyunBus.scala30
1 files changed, 17 insertions, 13 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
index c23ea0f..8b7bca7 100644
--- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -136,18 +136,22 @@ class AliyunBus(
def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
val subscriptionName = rawSubscriptionName(config, topic)
- val topicName = rawTopicName(topic)
- val topicRef = client.getTopicRef(topicName)
-
- val queueMeta = new QueueMeta
- queueMeta.setQueueName(subscriptionName)
- queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
- client.createQueue(queueMeta)
-
- val subscriptionMeta = new SubscriptionMeta
- subscriptionMeta.setSubscriptionName(subscriptionName)
- subscriptionMeta.setTopicName(topicName)
- subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
- topicRef.subscribe(subscriptionMeta)
+ val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)
+
+ if (!queueExists) {
+ val topicName = rawTopicName(topic)
+ val topicRef = client.getTopicRef(topicName)
+
+ val queueMeta = new QueueMeta
+ queueMeta.setQueueName(subscriptionName)
+ queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
+ client.createQueue(queueMeta)
+
+ val subscriptionMeta = new SubscriptionMeta
+ subscriptionMeta.setSubscriptionName(subscriptionName)
+ subscriptionMeta.setTopicName(topicName)
+ subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
+ topicRef.subscribe(subscriptionMeta)
+ }
}
}