aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
blob: 8b7bca7838f9d514f04fcb389e9009476934a82b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package xyz.driver.core.messaging
import java.nio.ByteBuffer
import java.util

import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
import com.aliyun.mns.common.ServiceException
import com.aliyun.mns.model
import com.aliyun.mns.model._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class AliyunBus(
    accountId: String,
    accessId: String,
    accessSecret: String,
    region: String,
    namespace: String,
    pullTimeout: Int
)(implicit val executionContext: ExecutionContext)
    extends Bus {
  private val endpoint     = s"https://$accountId.mns.$region.aliyuncs.com"
  private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
  private val client       = cloudAccount.getMNSClient

  // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the
  // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError
  // occasionally. We mask both of these errors and return an empty list of messages
  private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError")

  override val defaultMaxMessages: Int = 10

  case class MessageId(queueName: String, messageHandle: String)

  case class Message[A](id: MessageId, data: A) extends BasicMessage[A]

  case class SubscriptionConfig(
      subscriptionPrefix: String = accessId,
      ackTimeout: FiniteDuration = 10.seconds
  )

  override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()

  private def rawTopicName(topic: Topic[_]) =
    s"$namespace-${topic.name}"
  private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
    s"$namespace-${config.subscriptionPrefix}-${topic.name}"

  override def fetchMessages[A](
      topic: Topic[A],
      config: SubscriptionConfig,
      maxMessages: Int): Future[Seq[Message[A]]] = {
    import collection.JavaConverters._
    val subscriptionName = rawSubscriptionName(config, topic)
    val queueRef         = client.getQueueRef(subscriptionName)

    val promise = Promise[Seq[model.Message]]
    queueRef.asyncBatchPopMessage(
      maxMessages,
      pullTimeout,
      new AsyncCallback[util.List[model.Message]] {
        override def onSuccess(result: util.List[model.Message]): Unit = {
          promise.success(result.asScala)
        }
        override def onFail(ex: Exception): Unit = ex match {
          case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) =>
            promise.success(Nil)
          case _ =>
            promise.failure(ex)
        }
      }
    )

    promise.future.map(_.map { message =>
      import scala.xml.XML
      val messageId    = MessageId(subscriptionName, message.getReceiptHandle)
      val messageXML   = XML.loadString(message.getMessageBodyAsRawString)
      val messageNode  = messageXML \ "Message"
      val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)

      val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
      Message(messageId, deserializedMessage)
    })
  }

  override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
    import collection.JavaConverters._
    require(messages.nonEmpty, "Acknowledged message list must be non-empty")

    val queueRef = client.getQueueRef(messages.head.queueName)

    val promise = Promise[Unit]
    queueRef.asyncBatchDeleteMessage(
      messages.map(_.messageHandle).asJava,
      new AsyncCallback[Void] {
        override def onSuccess(result: Void): Unit = promise.success(())
        override def onFail(ex: Exception): Unit   = promise.failure(ex)
      }
    )

    promise.future
  }

  override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
    val topicRef = client.getTopicRef(rawTopicName(topic))

    val publishMessages = messages.map { message =>
      val promise = Promise[TopicMessage]

      val topicMessage = new Base64TopicMessage
      topicMessage.setMessageBody(topic.serialize(message).array())

      topicRef.asyncPublishMessage(
        topicMessage,
        new AsyncCallback[TopicMessage] {
          override def onSuccess(result: TopicMessage): Unit = promise.success(result)
          override def onFail(ex: Exception): Unit           = promise.failure(ex)
        }
      )

      promise.future
    }

    Future.sequence(publishMessages).map(_ => ())
  }

  def createTopic(topic: Topic[_]): Future[Unit] = Future {
    val topicName   = rawTopicName(topic)
    val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
    if (!topicExists) {
      val topicMeta = new TopicMeta
      topicMeta.setTopicName(topicName)
      client.createTopic(topicMeta)
    }
  }

  def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
    val subscriptionName = rawSubscriptionName(config, topic)
    val queueExists      = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty)

    if (!queueExists) {
      val topicName = rawTopicName(topic)
      val topicRef  = client.getTopicRef(topicName)

      val queueMeta = new QueueMeta
      queueMeta.setQueueName(subscriptionName)
      queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
      client.createQueue(queueMeta)

      val subscriptionMeta = new SubscriptionMeta
      subscriptionMeta.setSubscriptionName(subscriptionName)
      subscriptionMeta.setTopicName(topicName)
      subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
      topicRef.subscribe(subscriptionMeta)
    }
  }
}