From 3db799ba16c669691a587bfe6df5199cc92bf4d4 Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Mon, 20 Aug 2018 09:37:55 -0700 Subject: Add Aliyun Bus implementation (#193) --- build.sbt | 1 + .../xyz/driver/core/messaging/AliyunBus.scala | 139 +++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 src/main/scala/xyz/driver/core/messaging/AliyunBus.scala diff --git a/build.sbt b/build.sbt index 150f69c..7ff5f00 100644 --- a/build.sbt +++ b/build.sbt @@ -34,6 +34,7 @@ lazy val core = (project in file(".")) "com.google.cloud" % "google-cloud-pubsub" % "1.31.0", "com.google.cloud" % "google-cloud-storage" % "1.31.0", "com.aliyun.oss" % "aliyun-sdk-oss" % "2.8.2", + "com.aliyun.mns" % "aliyun-sdk-mns" % "1.1.8", "com.typesafe" % "config" % "1.3.3", "ch.qos.logback" % "logback-classic" % "1.2.3", "ch.qos.logback.contrib" % "logback-json-classic" % "0.1.5", diff --git a/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala new file mode 100644 index 0000000..66ae377 --- /dev/null +++ b/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -0,0 +1,139 @@ +package xyz.driver.core.messaging +import java.nio.ByteBuffer +import java.util + +import com.aliyun.mns.client.{AsyncCallback, CloudAccount} +import com.aliyun.mns.model +import com.aliyun.mns.model._ + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} + +class AliyunBus( + accountId: String, + accessId: String, + accessSecret: String, + region: String, + namespace: String, + pullTimeout: Int)(implicit val executionContext: ExecutionContext) + extends Bus with StreamBus with CreateBeforeStream { + val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" + val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) + val client = cloudAccount.getMNSClient + + override val defaultMaxMessages: Int = 10 + + case class MessageId(queueName: String, messageHandle: String) + + case class Message[A](id: MessageId, data: A) extends BasicMessage[A] + + case class SubscriptionConfig( + subscriptionPrefix: String = accessId, + ackTimeout: FiniteDuration = 10.seconds + ) + + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + private def rawTopicName(topic: Topic[_]) = + s"$namespace-${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"$namespace-${config.subscriptionPrefix}-${topic.name}" + + override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + import collection.JavaConverters._ + val subscriptionName = rawSubscriptionName(config, topic) + val queueRef = client.getQueueRef(subscriptionName) + + val promise = Promise[Seq[model.Message]] + queueRef.asyncBatchPopMessage( + maxMessages, + pullTimeout, + new AsyncCallback[util.List[model.Message]] { + override def onSuccess(result: util.List[model.Message]): Unit = promise.success(result.asScala) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future.map(_.map { message => + import scala.xml.XML + val messageId = MessageId(subscriptionName, message.getReceiptHandle) + val messageXML = XML.loadString(message.getMessageBodyAsRawString) + val messageNode = messageXML \ "Message" + val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) + + val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) + Message(messageId, deserializedMessage) + }) + } + + override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { + import collection.JavaConverters._ + require(messages.nonEmpty, "Acknowledged message list must be non-empty") + + val queueRef = client.getQueueRef(messages.head.queueName) + + val promise = Promise[Unit] + queueRef.asyncBatchDeleteMessage( + messages.map(_.messageHandle).asJava, + new AsyncCallback[Void] { + override def onSuccess(result: Void): Unit = promise.success(()) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { + val topicRef = client.getTopicRef(rawTopicName(topic)) + + val publishMessages = messages.map { message => + val promise = Promise[TopicMessage] + + val topicMessage = new Base64TopicMessage + topicMessage.setMessageBody(topic.serialize(message).array()) + + topicRef.asyncPublishMessage( + topicMessage, + new AsyncCallback[TopicMessage] { + override def onSuccess(result: TopicMessage): Unit = promise.success(result) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + Future.sequence(publishMessages).map(_ => ()) + } + + override def createTopic(topic: Topic[_]): Future[Unit] = Future { + val topicName = rawTopicName(topic) + val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) + if (!topicExists) { + val topicMeta = new TopicMeta + topicMeta.setTopicName(topicName) + client.createTopic(topicMeta) + } + } + + override def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { + val subscriptionName = rawSubscriptionName(config, topic) + val topicName = rawTopicName(topic) + val topicRef = client.getTopicRef(topicName) + + val queueMeta = new QueueMeta + queueMeta.setQueueName(subscriptionName) + queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) + client.createQueue(queueMeta) + + val subscriptionMeta = new SubscriptionMeta + subscriptionMeta.setSubscriptionName(subscriptionName) + subscriptionMeta.setTopicName(topicName) + subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) + topicRef.subscribe(subscriptionMeta) + } +} -- cgit v1.2.3