aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
blob: 2fa30f112d424f942df0306ba4cf6629d0c979db (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package xyz.driver.core.messaging
import java.nio.ByteBuffer
import java.util

import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
import com.aliyun.mns.model
import com.aliyun.mns.model._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class AliyunBus(
    accountId: String,
    accessId: String,
    accessSecret: String,
    region: String,
    namespace: String,
    pullTimeout: Int)(implicit val executionContext: ExecutionContext)
    extends Bus with StreamBus with CreateBeforeStream {
  val endpoint     = s"https://$accountId.mns.$region.aliyuncs.com"
  val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
  val client       = cloudAccount.getMNSClient

  override val defaultMaxMessages: Int = 10

  case class MessageId(queueName: String, messageHandle: String)

  case class Message[A](id: MessageId, data: A) extends BasicMessage[A]

  case class SubscriptionConfig(
      subscriptionPrefix: String = accessId,
      ackTimeout: FiniteDuration = 10.seconds
  )

  override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()

  private def rawTopicName(topic: Topic[_]) =
    s"$namespace-${topic.name}"
  private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
    s"$namespace-${config.subscriptionPrefix}-${topic.name}"

  override def fetchMessages[A](
      topic: Topic[A],
      config: SubscriptionConfig,
      maxMessages: Int): Future[Seq[Message[A]]] = {
    import collection.JavaConverters._
    val subscriptionName = rawSubscriptionName(config, topic)
    val queueRef         = client.getQueueRef(subscriptionName)

    val promise = Promise[Seq[model.Message]]
    queueRef.asyncBatchPopMessage(
      maxMessages,
      pullTimeout,
      new AsyncCallback[util.List[model.Message]] {
        override def onSuccess(result: util.List[model.Message]): Unit = promise.success(result.asScala)
        override def onFail(ex: Exception): Unit                       = promise.failure(ex)
      }
    )

    promise.future.map(_.map { message =>
      import scala.xml.XML
      val messageId    = MessageId(subscriptionName, message.getReceiptHandle)
      val messageXML   = XML.loadString(message.getMessageBodyAsRawString)
      val messageNode  = messageXML \ "Message"
      val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)

      val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
      Message(messageId, deserializedMessage)
    })
  }

  override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
    import collection.JavaConverters._
    require(messages.nonEmpty, "Acknowledged message list must be non-empty")

    val queueRef = client.getQueueRef(messages.head.queueName)

    val promise = Promise[Unit]
    queueRef.asyncBatchDeleteMessage(
      messages.map(_.messageHandle).asJava,
      new AsyncCallback[Void] {
        override def onSuccess(result: Void): Unit = promise.success(())
        override def onFail(ex: Exception): Unit   = promise.failure(ex)
      }
    )

    promise.future
  }

  override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {

    System.err.println("--------------------")
    System.err.println(rawTopicName(topic))
    System.err.println("--------------------")

    val topicRef = client.getTopicRef(rawTopicName(topic))


    val publishMessages = messages.map { message =>
      val promise = Promise[TopicMessage]

      val topicMessage = new Base64TopicMessage
      topicMessage.setMessageBody(topic.serialize(message).array())

      topicRef.asyncPublishMessage(
        topicMessage,
        new AsyncCallback[TopicMessage] {
          override def onSuccess(result: TopicMessage): Unit = promise.success(result)
          override def onFail(ex: Exception): Unit           = promise.failure(ex)
        }
      )

      promise.future
    }

    Future.sequence(publishMessages).map(_ => ())
  }

  override def createTopic(topic: Topic[_]): Future[Unit] = Future {
    val topicName   = rawTopicName(topic)
    val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
    if (!topicExists) {
      val topicMeta = new TopicMeta
      topicMeta.setTopicName(topicName)
      client.createTopic(topicMeta)
    }
  }

  override def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
    val subscriptionName = rawSubscriptionName(config, topic)
    val topicName        = rawTopicName(topic)
    val topicRef         = client.getTopicRef(topicName)

    val queueMeta = new QueueMeta
    queueMeta.setQueueName(subscriptionName)
    queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
    client.createQueue(queueMeta)

    val subscriptionMeta = new SubscriptionMeta
    subscriptionMeta.setSubscriptionName(subscriptionName)
    subscriptionMeta.setTopicName(topicName)
    subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
    topicRef.subscribe(subscriptionMeta)
  }
}