aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
blob: 66ae3775966d14bdfbff1a43d90f153c9eb7090b (plain) (tree)










































































































































                                                                                                        
package xyz.driver.core.messaging
import java.nio.ByteBuffer
import java.util

import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
import com.aliyun.mns.model
import com.aliyun.mns.model._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class AliyunBus(
    accountId: String,
    accessId: String,
    accessSecret: String,
    region: String,
    namespace: String,
    pullTimeout: Int)(implicit val executionContext: ExecutionContext)
    extends Bus with StreamBus with CreateBeforeStream {
  val endpoint     = s"https://$accountId.mns.$region.aliyuncs.com"
  val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
  val client       = cloudAccount.getMNSClient

  override val defaultMaxMessages: Int = 10

  case class MessageId(queueName: String, messageHandle: String)

  case class Message[A](id: MessageId, data: A) extends BasicMessage[A]

  case class SubscriptionConfig(
      subscriptionPrefix: String = accessId,
      ackTimeout: FiniteDuration = 10.seconds
  )

  override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()

  private def rawTopicName(topic: Topic[_]) =
    s"$namespace-${topic.name}"
  private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
    s"$namespace-${config.subscriptionPrefix}-${topic.name}"

  override def fetchMessages[A](
      topic: Topic[A],
      config: SubscriptionConfig,
      maxMessages: Int): Future[Seq[Message[A]]] = {
    import collection.JavaConverters._
    val subscriptionName = rawSubscriptionName(config, topic)
    val queueRef         = client.getQueueRef(subscriptionName)

    val promise = Promise[Seq[model.Message]]
    queueRef.asyncBatchPopMessage(
      maxMessages,
      pullTimeout,
      new AsyncCallback[util.List[model.Message]] {
        override def onSuccess(result: util.List[model.Message]): Unit = promise.success(result.asScala)
        override def onFail(ex: Exception): Unit                       = promise.failure(ex)
      }
    )

    promise.future.map(_.map { message =>
      import scala.xml.XML
      val messageId    = MessageId(subscriptionName, message.getReceiptHandle)
      val messageXML   = XML.loadString(message.getMessageBodyAsRawString)
      val messageNode  = messageXML \ "Message"
      val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)

      val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
      Message(messageId, deserializedMessage)
    })
  }

  override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
    import collection.JavaConverters._
    require(messages.nonEmpty, "Acknowledged message list must be non-empty")

    val queueRef = client.getQueueRef(messages.head.queueName)

    val promise = Promise[Unit]
    queueRef.asyncBatchDeleteMessage(
      messages.map(_.messageHandle).asJava,
      new AsyncCallback[Void] {
        override def onSuccess(result: Void): Unit = promise.success(())
        override def onFail(ex: Exception): Unit   = promise.failure(ex)
      }
    )

    promise.future
  }

  override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
    val topicRef = client.getTopicRef(rawTopicName(topic))

    val publishMessages = messages.map { message =>
      val promise = Promise[TopicMessage]

      val topicMessage = new Base64TopicMessage
      topicMessage.setMessageBody(topic.serialize(message).array())

      topicRef.asyncPublishMessage(
        topicMessage,
        new AsyncCallback[TopicMessage] {
          override def onSuccess(result: TopicMessage): Unit = promise.success(result)
          override def onFail(ex: Exception): Unit           = promise.failure(ex)
        }
      )

      promise.future
    }

    Future.sequence(publishMessages).map(_ => ())
  }

  override def createTopic(topic: Topic[_]): Future[Unit] = Future {
    val topicName   = rawTopicName(topic)
    val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
    if (!topicExists) {
      val topicMeta = new TopicMeta
      topicMeta.setTopicName(topicName)
      client.createTopic(topicMeta)
    }
  }

  override def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
    val subscriptionName = rawSubscriptionName(config, topic)
    val topicName        = rawTopicName(topic)
    val topicRef         = client.getTopicRef(topicName)

    val queueMeta = new QueueMeta
    queueMeta.setQueueName(subscriptionName)
    queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
    client.createQueue(queueMeta)

    val subscriptionMeta = new SubscriptionMeta
    subscriptionMeta.setSubscriptionName(subscriptionName)
    subscriptionMeta.setTopicName(topicName)
    subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
    topicRef.subscribe(subscriptionMeta)
  }
}