diff options
author | Jakob Odersky <jakob@driver.xyz> | 2018-09-12 16:18:26 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2018-10-09 16:19:39 -0700 |
commit | 7c755c77afbd67ae2ded9d8b004736d4e27e208f (patch) | |
tree | e93f4590165a338ed284adeb6f4a6bd43bb16b6a /core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala | |
parent | 76db2360364a35be31414a12cbc419a534a51744 (diff) | |
download | driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.gz driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.tar.bz2 driver-core-7c755c77afbd67ae2ded9d8b004736d4e27e208f.zip |
Move storage and messaging to separate projects
Diffstat (limited to 'core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala')
-rw-r--r-- | core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala new file mode 100644 index 0000000..8b7bca7 --- /dev/null +++ b/core-messaging/src/main/scala/xyz/driver/core/messaging/AliyunBus.scala @@ -0,0 +1,157 @@ +package xyz.driver.core.messaging +import java.nio.ByteBuffer +import java.util + +import com.aliyun.mns.client.{AsyncCallback, CloudAccount} +import com.aliyun.mns.common.ServiceException +import com.aliyun.mns.model +import com.aliyun.mns.model._ + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} + +class AliyunBus( + accountId: String, + accessId: String, + accessSecret: String, + region: String, + namespace: String, + pullTimeout: Int +)(implicit val executionContext: ExecutionContext) + extends Bus { + private val endpoint = s"https://$accountId.mns.$region.aliyuncs.com" + private val cloudAccount = new CloudAccount(accessId, accessSecret, endpoint) + private val client = cloudAccount.getMNSClient + + // When calling the asyncBatchPopMessage endpoint, alicloud returns an error if no message is received before the + // pullTimeout. This error is documented as MessageNotExist, however it's been observed to return InternalServerError + // occasionally. We mask both of these errors and return an empty list of messages + private val MaskedErrorCodes: Set[String] = Set("MessageNotExist", "InternalServerError") + + override val defaultMaxMessages: Int = 10 + + case class MessageId(queueName: String, messageHandle: String) + + case class Message[A](id: MessageId, data: A) extends BasicMessage[A] + + case class SubscriptionConfig( + subscriptionPrefix: String = accessId, + ackTimeout: FiniteDuration = 10.seconds + ) + + override val defaultSubscriptionConfig: SubscriptionConfig = SubscriptionConfig() + + private def rawTopicName(topic: Topic[_]) = + s"$namespace-${topic.name}" + private def rawSubscriptionName(config: SubscriptionConfig, topic: Topic[_]) = + s"$namespace-${config.subscriptionPrefix}-${topic.name}" + + override def fetchMessages[A]( + topic: Topic[A], + config: SubscriptionConfig, + maxMessages: Int): Future[Seq[Message[A]]] = { + import collection.JavaConverters._ + val subscriptionName = rawSubscriptionName(config, topic) + val queueRef = client.getQueueRef(subscriptionName) + + val promise = Promise[Seq[model.Message]] + queueRef.asyncBatchPopMessage( + maxMessages, + pullTimeout, + new AsyncCallback[util.List[model.Message]] { + override def onSuccess(result: util.List[model.Message]): Unit = { + promise.success(result.asScala) + } + override def onFail(ex: Exception): Unit = ex match { + case serviceException: ServiceException if MaskedErrorCodes(serviceException.getErrorCode) => + promise.success(Nil) + case _ => + promise.failure(ex) + } + } + ) + + promise.future.map(_.map { message => + import scala.xml.XML + val messageId = MessageId(subscriptionName, message.getReceiptHandle) + val messageXML = XML.loadString(message.getMessageBodyAsRawString) + val messageNode = messageXML \ "Message" + val messageBytes = java.util.Base64.getDecoder.decode(messageNode.head.text) + + val deserializedMessage = topic.deserialize(ByteBuffer.wrap(messageBytes)) + Message(messageId, deserializedMessage) + }) + } + + override def acknowledgeMessages(messages: Seq[MessageId]): Future[Unit] = { + import collection.JavaConverters._ + require(messages.nonEmpty, "Acknowledged message list must be non-empty") + + val queueRef = client.getQueueRef(messages.head.queueName) + + val promise = Promise[Unit] + queueRef.asyncBatchDeleteMessage( + messages.map(_.messageHandle).asJava, + new AsyncCallback[Void] { + override def onSuccess(result: Void): Unit = promise.success(()) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + override def publishMessages[A](topic: Topic[A], messages: Seq[A]): Future[Unit] = { + val topicRef = client.getTopicRef(rawTopicName(topic)) + + val publishMessages = messages.map { message => + val promise = Promise[TopicMessage] + + val topicMessage = new Base64TopicMessage + topicMessage.setMessageBody(topic.serialize(message).array()) + + topicRef.asyncPublishMessage( + topicMessage, + new AsyncCallback[TopicMessage] { + override def onSuccess(result: TopicMessage): Unit = promise.success(result) + override def onFail(ex: Exception): Unit = promise.failure(ex) + } + ) + + promise.future + } + + Future.sequence(publishMessages).map(_ => ()) + } + + def createTopic(topic: Topic[_]): Future[Unit] = Future { + val topicName = rawTopicName(topic) + val topicExists = Option(client.listTopic(topicName, "", 1)).exists(!_.getResult.isEmpty) + if (!topicExists) { + val topicMeta = new TopicMeta + topicMeta.setTopicName(topicName) + client.createTopic(topicMeta) + } + } + + def createSubscription(topic: Topic[_], config: SubscriptionConfig): Future[Unit] = Future { + val subscriptionName = rawSubscriptionName(config, topic) + val queueExists = Option(client.listQueue(subscriptionName, "", 1)).exists(!_.getResult.isEmpty) + + if (!queueExists) { + val topicName = rawTopicName(topic) + val topicRef = client.getTopicRef(topicName) + + val queueMeta = new QueueMeta + queueMeta.setQueueName(subscriptionName) + queueMeta.setVisibilityTimeout(config.ackTimeout.toSeconds) + client.createQueue(queueMeta) + + val subscriptionMeta = new SubscriptionMeta + subscriptionMeta.setSubscriptionName(subscriptionName) + subscriptionMeta.setTopicName(topicName) + subscriptionMeta.setEndpoint(topicRef.generateQueueEndpoint(subscriptionName)) + topicRef.subscribe(subscriptionMeta) + } + } +} |