aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZach Smith <zach@driver.xyz>2018-08-20 09:37:55 -0700
committerGitHub <noreply@github.com>2018-08-20 09:37:55 -0700
commit3db799ba16c669691a587bfe6df5199cc92bf4d4 (patch)
tree9fb7b1f033463a15b9c94ddba0141f6e628be556
parent5e1aa32b1a5adaf73817b7141cbf0dc6650b5b42 (diff)
downloaddriver-core-3db799ba16c669691a587bfe6df5199cc92bf4d4.tar.gz
driver-core-3db799ba16c669691a587bfe6df5199cc92bf4d4.tar.bz2
driver-core-3db799ba16c669691a587bfe6df5199cc92bf4d4.zip
Add Aliyun Bus implementation (#193)v1.12.2
-rw-r--r--build.sbt1
-rw-r--r--src/main/scala/xyz/driver/core/messaging/AliyunBus.scala139
2 files changed, 140 insertions, 0 deletions
diff --git a/build.sbt b/build.sbt
index 150f69c..7ff5f00 100644
--- a/build.sbt
+++ b/build.sbt
@@ -34,6 +34,7 @@ lazy val core = (project in file("."))
"com.google.cloud" % "google-cloud-pubsub" % "1.31.0",
"com.google.cloud" % "google-cloud-storage" % "1.31.0",
"com.aliyun.oss" % "aliyun-sdk-oss" % "2.8.2",
+ "com.aliyun.mns" % "aliyun-sdk-mns" % "1.1.8",
"com.typesafe" % "config" % "1.3.3",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"ch.qos.logback.contrib" % "logback-json-classic" % "0.1.5",
diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
new file mode 100644
index 0000000..66ae377
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala
@@ -0,0 +1,139 @@
+package xyz.driver.core.messaging
+import java.nio.ByteBuffer
+import java.util
+
+import com.aliyun.mns.client.{AsyncCallback, CloudAccount}
+import com.aliyun.mns.model
+import com.aliyun.mns.model._
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+class AliyunBus(
+ accountId: String,
+ accessId: String,
+ accessSecret: String,
+ region: String,
+ namespace: String,
+ pullTimeout: Int)(implicit val executionContext: ExecutionContext)
+ extends Bus with StreamBus with CreateBeforeStream {
+ val endpoint = s"https://$accountId.mns.$region.aliyuncs.com"
+ val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint)
+ val client = cloudAccount.getMNSClient
+
+ override val defaultMaxMessages: Int = 10
+
+ case class MessageId(queueName: String, messageHandle: String)
+
+ case class Message[A](id: MessageId, data: A) extends BasicMessage[A]
+
+ case class SubscriptionConfig(
+ subscriptionPrefix: String = accessId,
+ ackTimeout: FiniteDuration = 10.seconds
+ )
+
+ override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig()
+
+ private def rawTopicName(topic: Topic[_]) =
+ s"$namespace-${topic.name}"
+ private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) =
+ s"$namespace-${config.subscriptionPrefix}-${topic.name}"
+
+ override def fetchMessages[A](
+ topic: Topic[A],
+ config: SubscriptionConfig,
+ maxMessages: Int): Future[Seq[Message[A]]] = {
+ import collection.JavaConverters._
+ val subscriptionName = rawSubscriptionName(config, topic)
+ val queueRef = client.getQueueRef(subscriptionName)
+
+ val promise = Promise[Seq[model.Message]]
+ queueRef.asyncBatchPopMessage(
+ maxMessages,
+ pullTimeout,
+ new AsyncCallback[util.List[model.Message]] {
+ override def onSuccess(result: util.List[model.Message]): Unit = promise.success(result.asScala)
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future.map(_.map { message =>
+ import scala.xml.XML
+ val messageId = MessageId(subscriptionName, message.getReceiptHandle)
+ val messageXML = XML.loadString(message.getMessageBodyAsRawString)
+ val messageNode = messageXML \ "Message"
+ val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text)
+
+ val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes))
+ Message(messageId, deserializedMessage)
+ })
+ }
+
+ override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = {
+ import collection.JavaConverters._
+ require(messages.nonEmpty, "Acknowledged message list must be non-empty")
+
+ val queueRef = client.getQueueRef(messages.head.queueName)
+
+ val promise = Promise[Unit]
+ queueRef.asyncBatchDeleteMessage(
+ messages.map(_.messageHandle).asJava,
+ new AsyncCallback[Void] {
+ override def onSuccess(result: Void): Unit = promise.success(())
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future
+ }
+
+ override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = {
+ val topicRef = client.getTopicRef(rawTopicName(topic))
+
+ val publishMessages = messages.map { message =>
+ val promise = Promise[TopicMessage]
+
+ val topicMessage = new Base64TopicMessage
+ topicMessage.setMessageBody(topic.serialize(message).array())
+
+ topicRef.asyncPublishMessage(
+ topicMessage,
+ new AsyncCallback[TopicMessage] {
+ override def onSuccess(result: TopicMessage): Unit = promise.success(result)
+ override def onFail(ex: Exception): Unit = promise.failure(ex)
+ }
+ )
+
+ promise.future
+ }
+
+ Future.sequence(publishMessages).map(_ => ())
+ }
+
+ override def createTopic(topic: Topic[_]): Future[Unit] = Future {
+ val topicName = rawTopicName(topic)
+ val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty)
+ if (!topicExists) {
+ val topicMeta = new TopicMeta
+ topicMeta.setTopicName(topicName)
+ client.createTopic(topicMeta)
+ }
+ }
+
+ override def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future {
+ val subscriptionName = rawSubscriptionName(config, topic)
+ val topicName = rawTopicName(topic)
+ val topicRef = client.getTopicRef(topicName)
+
+ val queueMeta = new QueueMeta
+ queueMeta.setQueueName(subscriptionName)
+ queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds)
+ client.createQueue(queueMeta)
+
+ val subscriptionMeta = new SubscriptionMeta
+ subscriptionMeta.setSubscriptionName(subscriptionName)
+ subscriptionMeta.setTopicName(topicName)
+ subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName))
+ topicRef.subscribe(subscriptionMeta)
+ }
+}