aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-02-02 14:00:33 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-02 14:00:33 -0800
commite908322cd5991e6cbdaaafb8cd494759dac01225 (patch)
tree66ce728fd4de2216dea0009eac5fc5c783a1cec5 /external/mqtt/src
parent683e938242e29a0d584452e5230b4168b85bdab2 (diff)
downloadspark-e908322cd5991e6cbdaaafb8cd494759dac01225.tar.gz
spark-e908322cd5991e6cbdaaafb8cd494759dac01225.tar.bz2
spark-e908322cd5991e6cbdaaafb8cd494759dac01225.zip
[SPARK-4631][streaming][FIX] Wait for a receiver to start before publishing test data.
This fixes two sources of non-deterministic failures in this test: - wait for a receiver to be up before pushing data through MQTT - gracefully handle the case where the MQTT client is overloaded. There’s a hard-coded limit of 10 in-flight messages, and this test may hit it. Instead of crashing, we retry sending the message. Both of these are needed to make the test pass reliably on my machine. Author: Iulian Dragos <jaguarul@gmail.com> Closes #4270 from dragos/issue/fix-flaky-test-SPARK-4631 and squashes the following commits: f66c482 [Iulian Dragos] [SPARK-4631][streaming] Wait for a receiver to start before publishing test data. d408a8e [Iulian Dragos] Install callback before connecting to MQTT broker.
Diffstat (limited to 'external/mqtt/src')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala26
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala35
2 files changed, 46 insertions, 15 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 77661f71ad..1ef91dd492 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -55,14 +55,14 @@ class MQTTInputDStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](ssc_) with Logging {
-
+ ) extends ReceiverInputDStream[String](ssc_) {
+
def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(
brokerUrl: String,
topic: String,
@@ -72,21 +72,15 @@ class MQTTReceiver(
def onStop() {
}
-
+
def onStart() {
- // Set up persistence for messages
+ // Set up persistence for messages
val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
// Callback automatically triggers as and when new message arrives on specified topic
val callback: MqttCallback = new MqttCallback() {
@@ -103,7 +97,15 @@ class MQTTReceiver(
}
}
- // Set up callback for MqttClient
+ // Set up callback for MqttClient. This needs to happen before
+ // connecting or subscribing, otherwise messages may be lost
client.setCallback(callback)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index fe53a29cba..e84adc088a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.mqtt
import java.net.{URI, ServerSocket}
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -32,6 +34,8 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
@@ -67,7 +71,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
- var receiveMessage: List[String] = List()
+ @volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect.length > 0) {
receiveMessage = receiveMessage ::: List(rdd.first)
@@ -75,6 +79,11 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
}
}
ssc.start()
+
+ // wait for the receiver to start before publishing data, or we risk failing
+ // the test nondeterministically. See SPARK-4631
+ waitForReceiverToStart()
+
publishData(sendMessage)
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sendMessage.equals(receiveMessage(0)))
@@ -121,8 +130,14 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
- for (i <- 0 to 100) {
- msgTopic.publish(message)
+
+ for (i <- 0 to 10) {
+ try {
+ msgTopic.publish(message)
+ } catch {
+ case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+ Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
+ }
}
}
} finally {
@@ -131,4 +146,18 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
client = null
}
}
+
+ /**
+ * Block until at least one receiver has started or timeout occurs.
+ */
+ private def waitForReceiverToStart() = {
+ val latch = new CountDownLatch(1)
+ ssc.addStreamingListener(new StreamingListener {
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+ latch.countDown()
+ }
+ })
+
+ assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
+ }
}