aboutsummaryrefslogtreecommitdiff
path: root/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala')
-rw-r--r--core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala157
1 files changed, 157 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
new file mode 100644
index 0000000..8b7bca7
--- /dev/null
+++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -0,0 +1,157 @@
+package xyz.driver.core.messaging
+import java.nio.ByteBuffer
+import java.util
+
+import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
+import com.aliyun.mns.common.ServiceException
+import com.aliyun.mns.model
+import com.aliyun.mns.model._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+class AliyunBus(
+ accountId: String,
+ accessId: String,
+ accessSecret: String,
+ region: String,
+ namespace: String,
+ pullTimeout: Int
+)(implicit val executionContext: ExecutionContext)
+ extends Bus {
+ private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com"
+ private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
+ private val client = cloudAccount.getMNSClient
+
+ // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the
+ // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError
+ // occasionally. We mask both of these errors and return an empty list of messages
+ private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError")
+
+ override val defaultMaxMessages: Int = 10
+
+ case class MessageId(queueName: String, messageHandle: String)
+
+ case class Message[A](id: MessageId, data: A) extends BasicMessage[A]
+
+ case class SubscriptionConfig(
+ subscriptionPrefix: String = accessId,
+ ackTimeout: FiniteDuration = 10.seconds
+ )
+
+ override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()
+
+ private def rawTopicName(topic: Topic[_]) =
+ s"$namespace-${topic.name}"
+ private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
+ s"$namespace-${config.subscriptionPrefix}-${topic.name}"
+
+ override def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = {
+ import collection.JavaConverters._
+ val subscriptionName = rawSubscriptionName(config, topic)
+ val queueRef = client.getQueueRef(subscriptionName)
+
+ val promise = Promise[Seq[model.Message]]
+ queueRef.asyncBatchPopMessage(
+ maxMessages,
+ pullTimeout,
+ new AsyncCallback[util.List[model.Message]] {
+ override def onSuccess(result: util.List[model.Message]): Unit = {
+ promise.success(result.asScala)
+ }
+ override def onFail(ex: Exception): Unit = ex match {
+ case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) =>
+ promise.success(Nil)
+ case _ =>
+ promise.failure(ex)
+ }
+ }
+ )
+
+ promise.future.map(_.map { message =>
+ import scala.xml.XML
+ val messageId = MessageId(subscriptionName, message.getReceiptHandle)
+ val messageXML = XML.loadString(message.getMessageBodyAsRawString)
+ val messageNode = messageXML \ "Message"
+ val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)
+
+ val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
+ Message(messageId, deserializedMessage)
+ })
+ }
+
+ override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
+ import collection.JavaConverters._
+ require(messages.nonEmpty, "Acknowledged message list must be non-empty")
+
+ val queueRef = client.getQueueRef(messages.head.queueName)
+
+ val promise = Promise[Unit]
+ queueRef.asyncBatchDeleteMessage(
+ messages.map(_.messageHandle).asJava,
+ new AsyncCallback[Void] {
+ override def onSuccess(result: Void): Unit = promise.success(())
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future
+ }
+
+ override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
+ val topicRef = client.getTopicRef(rawTopicName(topic))
+
+ val publishMessages = messages.map { message =>
+ val promise = Promise[TopicMessage]
+
+ val topicMessage = new Base64TopicMessage
+ topicMessage.setMessageBody(topic.serialize(message).array())
+
+ topicRef.asyncPublishMessage(
+ topicMessage,
+ new AsyncCallback[TopicMessage] {
+ override def onSuccess(result: TopicMessage): Unit = promise.success(result)
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future
+ }
+
+ Future.sequence(publishMessages).map(_ => ())
+ }
+
+ def createTopic(topic: Topic[_]): Future[Unit] = Future {
+ val topicName = rawTopicName(topic)
+ val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
+ if (!topicExists) {
+ val topicMeta = new TopicMeta
+ topicMeta.setTopicName(topicName)
+ client.createTopic(topicMeta)
+ }
+ }
+
+ def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
+ val subscriptionName = rawSubscriptionName(config, topic)
+ val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)
+
+ if (!queueExists) {
+ val topicName = rawTopicName(topic)
+ val topicRef = client.getTopicRef(topicName)
+
+ val queueMeta = new QueueMeta
+ queueMeta.setQueueName(subscriptionName)
+ queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
+ client.createQueue(queueMeta)
+
+ val subscriptionMeta = new SubscriptionMeta
+ subscriptionMeta.setSubscriptionName(subscriptionName)
+ subscriptionMeta.setTopicName(topicName)
+ subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
+ topicRef.subscribe(subscriptionMeta)
+ }
+ }
+}