diff options
author | Zach Smith <zach@driver.xyz> | 2018-10-02 16:53:19 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-02 16:53:19 -0700 |
commit | 2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10 (patch) | |
tree | dfd72b2bef98b57bb8346441ac7b278a2c6a769c /src | |
parent | 18ad50733ac937148cbbb72c73fb5477ea6b50a1 (diff) | |
download | driver-core-2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10.tar.gz driver-core-2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10.tar.bz2 driver-core-2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10.zip |
Fixes for Alicloud pubsub (#230)
* Remove recover from processMessage in StreatBus
* Check if queue exists before creating
Diffstat (limited to 'src')
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/AliyunBus.scala | 30 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/StreamBus.scala | 2 |
2 files changed, 18 insertions, 14 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala index c23ea0f..8b7bca7 100644 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -136,18 +136,22 @@ class AliyunBus( def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { val subscriptionName = rawSubscriptionName(config, topic) - val topicName = rawTopicName(topic) - val topicRef = client.getTopicRef(topicName) - - val queueMeta = new QueueMeta - queueMeta.setQueueName(subscriptionName) - queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) - client.createQueue(queueMeta) - - val subscriptionMeta = new SubscriptionMeta - subscriptionMeta.setSubscriptionName(subscriptionName) - subscriptionMeta.setTopicName(topicName) - subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) - topicRef.subscribe(subscriptionMeta) + val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) + + if (!queueExists) { + val topicName = rawTopicName(topic) + val topicRef = client.getTopicRef(topicName) + + val queueMeta = new QueueMeta + queueMeta.setQueueName(subscriptionName) + queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) + client.createQueue(queueMeta) + + val subscriptionMeta = new SubscriptionMeta + subscriptionMeta.setSubscriptionName(subscriptionName) + subscriptionMeta.setTopicName(topicName) + subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) + topicRef.subscribe(subscriptionMeta) + } } } diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala index a9ba3a7..44d75cd 100644 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -76,7 +76,7 @@ trait StreamBus extends Bus { maxRestarts ) { () => subscribe(topic, config) - .via(processMessage.recover({ case _ => Nil })) + .via(processMessage) .log(topic.name) .mapConcat(identity) } |