From e767d7ddac5c2330af553f2a74b8575dfc7afb67 Mon Sep 17 00:00:00 2001 From: bilna Date: Sun, 4 Jan 2015 19:37:48 -0800 Subject: [SPARK-4631] unit test for MQTT Please review the unit test for MQTT Author: bilna Author: Bilna P Closes #3844 from Bilna/master and squashes the following commits: acea3a3 [bilna] Adding dependency with scope test 28681fa [bilna] Merge remote-tracking branch 'upstream/master' fac3904 [bilna] Correction in Indentation and coding style ed9db4c [bilna] Merge remote-tracking branch 'upstream/master' 4b34ee7 [Bilna P] Update MQTTStreamSuite.scala 04503cf [bilna] Added embedded broker service for mqtt test 89d804e [bilna] Merge remote-tracking branch 'upstream/master' fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master' 4b58094 [Bilna P] Update MQTTStreamSuite.scala b1ac4ad [bilna] Added BeforeAndAfter 5f6bfd2 [bilna] Added BeforeAndAfter e8b6623 [Bilna P] Update MQTTStreamSuite.scala 5ca6691 [Bilna P] Update MQTTStreamSuite.scala 8616495 [bilna] [SPARK-4631] unit test for MQTT --- external/mqtt/pom.xml | 6 ++ .../spark/streaming/mqtt/MQTTStreamSuite.scala | 110 ++++++++++++++++++--- 2 files changed, 101 insertions(+), 15 deletions(-) (limited to 'external/mqtt') diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 9025915f44..d478267b60 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -66,6 +66,12 @@ junit-interface test + + org.apache.activemq + activemq-core + 5.7.0 + test + target/scala-${scala.binary.version}/classes diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 84595acf45..98fe6cb301 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = "//localhost:" + freePort + private val topic = "def" + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test("mqtt input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val brokerUrl = "abc" - val topic = "def" + before { + ssc = new StreamingContext(master, framework, batchDuration) + setupMQTT() + } - // tests the API, does not actually test data receiving - val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) - val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + Utils.deleteRecursively(persistenceDir) + tearDownMQTT() + } - // TODO: Actually test receiving data + test("mqtt input stream") { + val sendMessage = "MQTT demo for spark streaming" + val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) + var receiveMessage: List[String] = List() + receiveStream.foreachRDD { rdd => + if (rdd.collect.length > 0) { + receiveMessage = receiveMessage ::: List(rdd.first) + receiveMessage + } + } + ssc.start() + publishData(sendMessage) + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) + } ssc.stop() } + + private def setupMQTT() { + broker = new BrokerService() + connector = new TransportConnector() + connector.setName("mqtt") + connector.setUri(new URI("mqtt:" + brokerUri)) + broker.addConnector(connector) + broker.start() + } + + private def tearDownMQTT() { + if (broker != null) { + broker.stop() + broker = null + } + if (connector != null) { + connector.stop() + connector = null + } + } + + private def findFreePort(): Int = { + Utils.startServiceOnPort(23456, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + })._2 + } + + def publishData(data: String): Unit = { + var client: MqttClient = null + try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { + val msgTopic: MqttTopic = client.getTopic(topic) + val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) + message.setQos(1) + message.setRetained(true) + for (i <- 0 to 100) + msgTopic.publish(message) + } + } finally { + client.disconnect() + client.close() + client = null + } + } } -- cgit v1.2.3