diff options
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/AliyunBus.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/AliyunBus.scala | 157 |
1 files changed, 0 insertions, 157 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala deleted file mode 100644 index 8b7bca7..0000000 --- a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala +++ /dev/null @@ -1,157 +0,0 @@ -package xyz.driver.core.messaging -import java.nio.ByteBuffer -import java.util - -import com.aliyun.mns.client.{AsyncCallback, CloudAccount} -import com.aliyun.mns.common.ServiceException -import com.aliyun.mns.model -import com.aliyun.mns.model._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} - -class AliyunBus( - accountId: String, - accessId: String, - accessSecret: String, - region: String, - namespace: String, - pullTimeout: Int -)(implicit val executionContext: ExecutionContext) - extends Bus { - private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" - private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) - private val client = cloudAccount.getMNSClient - - // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the - // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError - // occasionally. We mask both of these errors and return an empty list of messages - private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") - - override val defaultMaxMessages: Int = 10 - - case class MessageId(queueName: String, messageHandle: String) - - case class Message[A](id: MessageId, data: A) extends BasicMessage[A] - - case class SubscriptionConfig( - subscriptionPrefix: String = accessId, - ackTimeout: FiniteDuration = 10.seconds - ) - - override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() - - private def rawTopicName(topic: Topic[_]) = - s"$namespace-${topic.name}" - private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = - s"$namespace-${config.subscriptionPrefix}-${topic.name}" - - override def fetchMessages[A]( - topic: Topic[A], - config: SubscriptionConfig, - maxMessages: Int): Future[Seq[Message[A]]] = { - import collection.JavaConverters._ - val subscriptionName = rawSubscriptionName(config, topic) - val queueRef = client.getQueueRef(subscriptionName) - - val promise = Promise[Seq[model.Message]] - queueRef.asyncBatchPopMessage( - maxMessages, - pullTimeout, - new AsyncCallback[util.List[model.Message]] { - override def onSuccess(result: util.List[model.Message]): Unit = { - promise.success(result.asScala) - } - override def onFail(ex: Exception): Unit = ex match { - case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => - promise.success(Nil) - case _ => - promise.failure(ex) - } - } - ) - - promise.future.map(_.map { message => - import scala.xml.XML - val messageId = MessageId(subscriptionName, message.getReceiptHandle) - val messageXML = XML.loadString(message.getMessageBodyAsRawString) - val messageNode = messageXML \ "Message" - val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) - - val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) - Message(messageId, deserializedMessage) - }) - } - - override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { - import collection.JavaConverters._ - require(messages.nonEmpty, "Acknowledged message list must be non-empty") - - val queueRef = client.getQueueRef(messages.head.queueName) - - val promise = Promise[Unit] - queueRef.asyncBatchDeleteMessage( - messages.map(_.messageHandle).asJava, - new AsyncCallback[Void] { - override def onSuccess(result: Void): Unit = promise.success(()) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { - val topicRef = client.getTopicRef(rawTopicName(topic)) - - val publishMessages = messages.map { message => - val promise = Promise[TopicMessage] - - val topicMessage = new Base64TopicMessage - topicMessage.setMessageBody(topic.serialize(message).array()) - - topicRef.asyncPublishMessage( - topicMessage, - new AsyncCallback[TopicMessage] { - override def onSuccess(result: TopicMessage): Unit = promise.success(result) - override def onFail(ex: Exception): Unit = promise.failure(ex) - } - ) - - promise.future - } - - Future.sequence(publishMessages).map(_ => ()) - } - - def createTopic(topic: Topic[_]): Future[Unit] = Future { - val topicName = rawTopicName(topic) - val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) - if (!topicExists) { - val topicMeta = new TopicMeta - topicMeta.setTopicName(topicName) - client.createTopic(topicMeta) - } - } - - def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { - val subscriptionName = rawSubscriptionName(config, topic) - val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) - - if (!queueExists) { - val topicName = rawTopicName(topic) - val topicRef = client.getTopicRef(topicName) - - val queueMeta = new QueueMeta - queueMeta.setQueueName(subscriptionName) - queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) - client.createQueue(queueMeta) - - val subscriptionMeta = new SubscriptionMeta - subscriptionMeta.setSubscriptionName(subscriptionName) - subscriptionMeta.setTopicName(topicName) - subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) - topicRef.subscribe(subscriptionMeta) - } - } -} |