aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorbilna <bilnap@am.amrita.edu>2015-01-04 19:37:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-04 19:37:48 -0800
commite767d7ddac5c2330af553f2a74b8575dfc7afb67 (patch)
tree9d7dc4b5d7a9674853ebc1b18dea9f17358abea2 /external
parent3fddc9468fa50e7683caa973fec6c52e1132268d (diff)
downloadspark-e767d7ddac5c2330af553f2a74b8575dfc7afb67.tar.gz
spark-e767d7ddac5c2330af553f2a74b8575dfc7afb67.tar.bz2
spark-e767d7ddac5c2330af553f2a74b8575dfc7afb67.zip
[SPARK-4631] unit test for MQTT
Please review the unit test for MQTT Author: bilna <bilnap@am.amrita.edu> Author: Bilna P <bilna.p@gmail.com> Closes #3844 from Bilna/master and squashes the following commits: acea3a3 [bilna] Adding dependency with scope test 28681fa [bilna] Merge remote-tracking branch 'upstream/master' fac3904 [bilna] Correction in Indentation and coding style ed9db4c [bilna] Merge remote-tracking branch 'upstream/master' 4b34ee7 [Bilna P] Update MQTTStreamSuite.scala 04503cf [bilna] Added embedded broker service for mqtt test 89d804e [bilna] Merge remote-tracking branch 'upstream/master' fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master' 4b58094 [Bilna P] Update MQTTStreamSuite.scala b1ac4ad [bilna] Added BeforeAndAfter 5f6bfd2 [bilna] Added BeforeAndAfter e8b6623 [Bilna P] Update MQTTStreamSuite.scala 5ca6691 [Bilna P] Update MQTTStreamSuite.scala 8616495 [bilna] [SPARK-4631] unit test for MQTT
Diffstat (limited to 'external')
-rw-r--r--external/mqtt/pom.xml6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala110
2 files changed, 101 insertions, 15 deletions
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 9025915f44..d478267b60 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -66,6 +66,12 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.7.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 84595acf45..98fe6cb301 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -17,31 +17,111 @@
package org.apache.spark.streaming.mqtt
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-class MQTTStreamSuite extends FunSuite {
-
- val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
+ private val batchDuration = Milliseconds(500)
private val master: String = "local[2]"
-
private val framework: String = this.getClass.getSimpleName
+ private val freePort = findFreePort()
+ private val brokerUri = "//localhost:" + freePort
+ private val topic = "def"
+ private var ssc: StreamingContext = _
+ private val persistenceDir = Utils.createTempDir()
+ private var broker: BrokerService = _
+ private var connector: TransportConnector = _
- test("mqtt input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val brokerUrl = "abc"
- val topic = "def"
+ before {
+ ssc = new StreamingContext(master, framework, batchDuration)
+ setupMQTT()
+ }
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
- val test2: ReceiverInputDStream[String] =
- MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ Utils.deleteRecursively(persistenceDir)
+ tearDownMQTT()
+ }
- // TODO: Actually test receiving data
+ test("mqtt input stream") {
+ val sendMessage = "MQTT demo for spark streaming"
+ val receiveStream: ReceiverInputDStream[String] =
+ MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
+ var receiveMessage: List[String] = List()
+ receiveStream.foreachRDD { rdd =>
+ if (rdd.collect.length > 0) {
+ receiveMessage = receiveMessage ::: List(rdd.first)
+ receiveMessage
+ }
+ }
+ ssc.start()
+ publishData(sendMessage)
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ assert(sendMessage.equals(receiveMessage(0)))
+ }
ssc.stop()
}
+
+ private def setupMQTT() {
+ broker = new BrokerService()
+ connector = new TransportConnector()
+ connector.setName("mqtt")
+ connector.setUri(new URI("mqtt:" + brokerUri))
+ broker.addConnector(connector)
+ broker.start()
+ }
+
+ private def tearDownMQTT() {
+ if (broker != null) {
+ broker.stop()
+ broker = null
+ }
+ if (connector != null) {
+ connector.stop()
+ connector = null
+ }
+ }
+
+ private def findFreePort(): Int = {
+ Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ })._2
+ }
+
+ def publishData(data: String): Unit = {
+ var client: MqttClient = null
+ try {
+ val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+ client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
+ client.connect()
+ if (client.isConnected) {
+ val msgTopic: MqttTopic = client.getTopic(topic)
+ val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+ message.setQos(1)
+ message.setRetained(true)
+ for (i <- 0 to 100)
+ msgTopic.publish(message)
+ }
+ } finally {
+ client.disconnect()
+ client.close()
+ client = null
+ }
+ }
}