aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src/main/scala/org
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-02-02 14:00:33 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-02 14:00:33 -0800
commite908322cd5991e6cbdaaafb8cd494759dac01225 (patch)
tree66ce728fd4de2216dea0009eac5fc5c783a1cec5 /external/mqtt/src/main/scala/org
parent683e938242e29a0d584452e5230b4168b85bdab2 (diff)
downloadspark-e908322cd5991e6cbdaaafb8cd494759dac01225.tar.gz
spark-e908322cd5991e6cbdaaafb8cd494759dac01225.tar.bz2
spark-e908322cd5991e6cbdaaafb8cd494759dac01225.zip
[SPARK-4631][streaming][FIX] Wait for a receiver to start before publishing test data.
This fixes two sources of non-deterministic failures in this test: - wait for a receiver to be up before pushing data through MQTT - gracefully handle the case where the MQTT client is overloaded. There’s a hard-coded limit of 10 in-flight messages, and this test may hit it. Instead of crashing, we retry sending the message. Both of these are needed to make the test pass reliably on my machine. Author: Iulian Dragos <jaguarul@gmail.com> Closes #4270 from dragos/issue/fix-flaky-test-SPARK-4631 and squashes the following commits: f66c482 [Iulian Dragos] [SPARK-4631][streaming] Wait for a receiver to start before publishing test data. d408a8e [Iulian Dragos] Install callback before connecting to MQTT broker.
Diffstat (limited to 'external/mqtt/src/main/scala/org')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala26
1 files changed, 14 insertions, 12 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 77661f71ad..1ef91dd492 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -55,14 +55,14 @@ class MQTTInputDStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](ssc_) with Logging {
-
+ ) extends ReceiverInputDStream[String](ssc_) {
+
def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(
brokerUrl: String,
topic: String,
@@ -72,21 +72,15 @@ class MQTTReceiver(
def onStop() {
}
-
+
def onStart() {
- // Set up persistence for messages
+ // Set up persistence for messages
val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
// Callback automatically triggers as and when new message arrives on specified topic
val callback: MqttCallback = new MqttCallback() {
@@ -103,7 +97,15 @@ class MQTTReceiver(
}
}
- // Set up callback for MqttClient
+ // Set up callback for MqttClient. This needs to happen before
+ // connecting or subscribing, otherwise messages may be lost
client.setCallback(callback)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
}
}