aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
blob: c23ea0f925805d26cee44b51f94f0a13c520314a (plain) (tree)
1
2
3
4
5
6




                                                          
                                             











                                                           

                                                  
                 







                                                                                                                       































                                                                                   


                                                                          
                                                             
                                                                                                       
                                

                               
         























































                                                                                      
                                                           








                                                                                             
                                                                                              















                                                                                  
package xyz.driver.core.messaging
import java.nio.ByteBuffer
import java.util

import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
import com.aliyun.mns.common.ServiceException
import com.aliyun.mns.model
import com.aliyun.mns.model._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class AliyunBus(
    accountId: String,
    accessId: String,
    accessSecret: String,
    region: String,
    namespace: String,
    pullTimeout: Int
)(implicit val executionContext: ExecutionContext)
    extends Bus {
  private val endpoint     = s"https://$accountId.mns.$region.aliyuncs.com"
  private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
  private val client       = cloudAccount.getMNSClient

  // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the
  // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError
  // occasionally. We mask both of these errors and return an empty list of messages
  private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError")

  override val defaultMaxMessages: Int = 10

  case class MessageId(queueName: String, messageHandle: String)

  case class Message[A](id: MessageId, data: A) extends BasicMessage[A]

  case class SubscriptionConfig(
      subscriptionPrefix: String = accessId,
      ackTimeout: FiniteDuration = 10.seconds
  )

  override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()

  private def rawTopicName(topic: Topic[_]) =
    s"$namespace-${topic.name}"
  private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
    s"$namespace-${config.subscriptionPrefix}-${topic.name}"

  override def fetchMessages[A](
      topic: Topic[A],
      config: SubscriptionConfig,
      maxMessages: Int): Future[Seq[Message[A]]] = {
    import collection.JavaConverters._
    val subscriptionName = rawSubscriptionName(config, topic)
    val queueRef         = client.getQueueRef(subscriptionName)

    val promise = Promise[Seq[model.Message]]
    queueRef.asyncBatchPopMessage(
      maxMessages,
      pullTimeout,
      new AsyncCallback[util.List[model.Message]] {
        override def onSuccess(result: util.List[model.Message]): Unit = {
          promise.success(result.asScala)
        }
        override def onFail(ex: Exception): Unit = ex match {
          case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) =>
            promise.success(Nil)
          case _ =>
            promise.failure(ex)
        }
      }
    )

    promise.future.map(_.map { message =>
      import scala.xml.XML
      val messageId    = MessageId(subscriptionName, message.getReceiptHandle)
      val messageXML   = XML.loadString(message.getMessageBodyAsRawString)
      val messageNode  = messageXML \ "Message"
      val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)

      val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
      Message(messageId, deserializedMessage)
    })
  }

  override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
    import collection.JavaConverters._
    require(messages.nonEmpty, "Acknowledged message list must be non-empty")

    val queueRef = client.getQueueRef(messages.head.queueName)

    val promise = Promise[Unit]
    queueRef.asyncBatchDeleteMessage(
      messages.map(_.messageHandle).asJava,
      new AsyncCallback[Void] {
        override def onSuccess(result: Void): Unit = promise.success(())
        override def onFail(ex: Exception): Unit   = promise.failure(ex)
      }
    )

    promise.future
  }

  override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
    val topicRef = client.getTopicRef(rawTopicName(topic))

    val publishMessages = messages.map { message =>
      val promise = Promise[TopicMessage]

      val topicMessage = new Base64TopicMessage
      topicMessage.setMessageBody(topic.serialize(message).array())

      topicRef.asyncPublishMessage(
        topicMessage,
        new AsyncCallback[TopicMessage] {
          override def onSuccess(result: TopicMessage): Unit = promise.success(result)
          override def onFail(ex: Exception): Unit           = promise.failure(ex)
        }
      )

      promise.future
    }

    Future.sequence(publishMessages).map(_ => ())
  }

  def createTopic(topic: Topic[_]): Future[Unit] = Future {
    val topicName   = rawTopicName(topic)
    val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
    if (!topicExists) {
      val topicMeta = new TopicMeta
      topicMeta.setTopicName(topicName)
      client.createTopic(topicMeta)
    }
  }

  def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
    val subscriptionName = rawSubscriptionName(config, topic)
    val topicName        = rawTopicName(topic)
    val topicRef         = client.getTopicRef(topicName)

    val queueMeta = new QueueMeta
    queueMeta.setQueueName(subscriptionName)
    queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
    client.createQueue(queueMeta)

    val subscriptionMeta = new SubscriptionMeta
    subscriptionMeta.setSubscriptionName(subscriptionName)
    subscriptionMeta.setTopicName(topicName)
    subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
    topicRef.subscribe(subscriptionMeta)
  }
}