diff options
author | bilna <bilnap@am.amrita.edu> | 2015-01-04 19:37:48 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-01-04 19:37:48 -0800 |
commit | e767d7ddac5c2330af553f2a74b8575dfc7afb67 (patch) | |
tree | 9d7dc4b5d7a9674853ebc1b18dea9f17358abea2 /external | |
parent | 3fddc9468fa50e7683caa973fec6c52e1132268d (diff) | |
download | spark-e767d7ddac5c2330af553f2a74b8575dfc7afb67.tar.gz spark-e767d7ddac5c2330af553f2a74b8575dfc7afb67.tar.bz2 spark-e767d7ddac5c2330af553f2a74b8575dfc7afb67.zip |
[SPARK-4631] unit test for MQTT
Please review the unit test for MQTT
Author: bilna <bilnap@am.amrita.edu>
Author: Bilna P <bilna.p@gmail.com>
Closes #3844 from Bilna/master and squashes the following commits:
acea3a3 [bilna] Adding dependency with scope test
28681fa [bilna] Merge remote-tracking branch 'upstream/master'
fac3904 [bilna] Correction in Indentation and coding style
ed9db4c [bilna] Merge remote-tracking branch 'upstream/master'
4b34ee7 [Bilna P] Update MQTTStreamSuite.scala
04503cf [bilna] Added embedded broker service for mqtt test
89d804e [bilna] Merge remote-tracking branch 'upstream/master'
fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master'
4b58094 [Bilna P] Update MQTTStreamSuite.scala
b1ac4ad [bilna] Added BeforeAndAfter
5f6bfd2 [bilna] Added BeforeAndAfter
e8b6623 [Bilna P] Update MQTTStreamSuite.scala
5ca6691 [Bilna P] Update MQTTStreamSuite.scala
8616495 [bilna] [SPARK-4631] unit test for MQTT
Diffstat (limited to 'external')
-rw-r--r-- | external/mqtt/pom.xml | 6 | ||||
-rw-r--r-- | external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 110 |
2 files changed, 101 insertions, 15 deletions
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 9025915f44..d478267b60 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -66,6 +66,12 @@ <artifactId>junit-interface</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-core</artifactId> + <version>5.7.0</version> + <scope>test</scope> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 84595acf45..98fe6cb301 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = "//localhost:" + freePort + private val topic = "def" + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test("mqtt input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val brokerUrl = "abc" - val topic = "def" + before { + ssc = new StreamingContext(master, framework, batchDuration) + setupMQTT() + } - // tests the API, does not actually test data receiving - val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) - val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + Utils.deleteRecursively(persistenceDir) + tearDownMQTT() + } - // TODO: Actually test receiving data + test("mqtt input stream") { + val sendMessage = "MQTT demo for spark streaming" + val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) + var receiveMessage: List[String] = List() + receiveStream.foreachRDD { rdd => + if (rdd.collect.length > 0) { + receiveMessage = receiveMessage ::: List(rdd.first) + receiveMessage + } + } + ssc.start() + publishData(sendMessage) + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) + } ssc.stop() } + + private def setupMQTT() { + broker = new BrokerService() + connector = new TransportConnector() + connector.setName("mqtt") + connector.setUri(new URI("mqtt:" + brokerUri)) + broker.addConnector(connector) + broker.start() + } + + private def tearDownMQTT() { + if (broker != null) { + broker.stop() + broker = null + } + if (connector != null) { + connector.stop() + connector = null + } + } + + private def findFreePort(): Int = { + Utils.startServiceOnPort(23456, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + })._2 + } + + def publishData(data: String): Unit = { + var client: MqttClient = null + try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { + val msgTopic: MqttTopic = client.getTopic(topic) + val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) + message.setQos(1) + message.setRetained(true) + for (i <- 0 to 100) + msgTopic.publish(message) + } + } finally { + client.disconnect() + client.close() + client = null + } + } } |