aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2018-10-02 16:53:19 -0700
committerGitHub <noreply@github.com>2018-10-02 16:53:19 -0700
commit2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10 (patch)
treedfd72b2bef98b57bb8346441ac7b278a2c6a769c
parent18ad50733ac937148cbbb72c73fb5477ea6b50a1 (diff)
downloaddriver-core-2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10.tar.gz
driver-core-2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10.tar.bz2
driver-core-2cef01adfe3ebd3a0fa1e0bbbba7f6388198ba10.zip
Fixes for Alicloud pubsub (#230)
* Remove recover from processMessage in StreatBus * Check if queue exists before creating
-rw-r--r--src/main/scala/xyz/driver/core/messaging/AliyunBus.scala30
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala2
2 files changed, 18 insertions, 14 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
index c23ea0f..8b7bca7 100644
--- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -136,18 +136,22 @@ class AliyunBus(
def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
val subscriptionName = rawSubscriptionName(config, topic)
- val topicName = rawTopicName(topic)
- val topicRef = client.getTopicRef(topicName)
-
- val queueMeta = new QueueMeta
- queueMeta.setQueueName(subscriptionName)
- queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
- client.createQueue(queueMeta)
-
- val subscriptionMeta = new SubscriptionMeta
- subscriptionMeta.setSubscriptionName(subscriptionName)
- subscriptionMeta.setTopicName(topicName)
- subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
- topicRef.subscribe(subscriptionMeta)
+ val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)
+
+ if (!queueExists) {
+ val topicName = rawTopicName(topic)
+ val topicRef = client.getTopicRef(topicName)
+
+ val queueMeta = new QueueMeta
+ queueMeta.setQueueName(subscriptionName)
+ queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
+ client.createQueue(queueMeta)
+
+ val subscriptionMeta = new SubscriptionMeta
+ subscriptionMeta.setSubscriptionName(subscriptionName)
+ subscriptionMeta.setTopicName(topicName)
+ subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
+ topicRef.subscribe(subscriptionMeta)
+ }
}
}
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
index a9ba3a7..44d75cd 100644
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -76,7 +76,7 @@ trait StreamBus extends Bus {
maxRestarts
) { () =>
subscribe(topic, config)
- .via(processMessage.recover({ case _ => Nil }))
+ .via(processMessage)
.log(topic.name)
.mapConcat(identity)
}