aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'external/mqtt/src/main')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala26
1 files changed, 14 insertions, 12 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 77661f71ad..1ef91dd492 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -55,14 +55,14 @@ class MQTTInputDStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](ssc_) with Logging {
-
+ ) extends ReceiverInputDStream[String](ssc_) {
+
def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(
brokerUrl: String,
topic: String,
@@ -72,21 +72,15 @@ class MQTTReceiver(
def onStop() {
}
-
+
def onStart() {
- // Set up persistence for messages
+ // Set up persistence for messages
val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
// Callback automatically triggers as and when new message arrives on specified topic
val callback: MqttCallback = new MqttCallback() {
@@ -103,7 +97,15 @@ class MQTTReceiver(
}
}
- // Set up callback for MqttClient
+ // Set up callback for MqttClient. This needs to happen before
+ // connecting or subscribing, otherwise messages may be lost
client.setCallback(callback)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
}
}