aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2018-10-02 16:53:12 -0700
committerGitHub <noreply@github.com>2018-10-02 16:53:12 -0700
commitc1b74c39568d6152f1498508f5aaaaf069aba39b (patch)
tree2c3fa2560cd109da434262708501a15bc0ae20dc
parent0783a7767d69b32afb4acf3eb01d541660ea879d (diff)
downloaddriver-core-1.x.tar.gz
driver-core-1.x.tar.bz2
driver-core-1.x.zip
Fixes for alicloud pubsub (#229)v1.14.71.x
* Remove recover from processMessage in StreatBus * Check if queue exists before creating
-rw-r--r--src/main/scala/xyz/driver/core/messaging/AliyunBus.scala30
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala2
2 files changed, 18 insertions, 14 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
index c23ea0f..8b7bca7 100644
--- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -136,18 +136,22 @@ class AliyunBus(
def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
val subscriptionName = rawSubscriptionName(config, topic)
- val topicName = rawTopicName(topic)
- val topicRef = client.getTopicRef(topicName)
-
- val queueMeta = new QueueMeta
- queueMeta.setQueueName(subscriptionName)
- queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
- client.createQueue(queueMeta)
-
- val subscriptionMeta = new SubscriptionMeta
- subscriptionMeta.setSubscriptionName(subscriptionName)
- subscriptionMeta.setTopicName(topicName)
- subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
- topicRef.subscribe(subscriptionMeta)
+ val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)
+
+ if (!queueExists) {
+ val topicName = rawTopicName(topic)
+ val topicRef = client.getTopicRef(topicName)
+
+ val queueMeta = new QueueMeta
+ queueMeta.setQueueName(subscriptionName)
+ queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
+ client.createQueue(queueMeta)
+
+ val subscriptionMeta = new SubscriptionMeta
+ subscriptionMeta.setSubscriptionName(subscriptionName)
+ subscriptionMeta.setTopicName(topicName)
+ subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
+ topicRef.subscribe(subscriptionMeta)
+ }
}
}
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
index a9ba3a7..44d75cd 100644
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -76,7 +76,7 @@ trait StreamBus extends Bus {
maxRestarts
) { () =>
subscribe(topic, config)
- .via(processMessage.recover({ case _ => Nil }))
+ .via(processMessage)
.log(topic.name)
.mapConcat(identity)
}