aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorprabeesh <prabsmails@gmail.com>2013-10-17 10:00:40 +0530
committerprabeesh <prabsmails@gmail.com>2013-10-17 10:00:40 +0530
commit890f8fe4393a20749e0a6cfd57ff07f60cfad2a1 (patch)
tree33723610f68817143f833b5f0078e776a4a6911d /streaming
parentee4178f144d7752092da53ceea686fbb6c37d5db (diff)
downloadspark-890f8fe4393a20749e0a6cfd57ff07f60cfad2a1.tar.gz
spark-890f8fe4393a20749e0a6cfd57ff07f60cfad2a1.tar.bz2
spark-890f8fe4393a20749e0a6cfd57ff07f60cfad2a1.zip
modify code, use Spark Logging Class
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala61
1 files changed, 26 insertions, 35 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
index 3416989c02..9b3fe67e6a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@ -23,16 +23,16 @@ import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContex
import java.util.Properties
import java.util.concurrent.Executors
-import java.io.IOException;
+import java.io.IOException
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttTopic;
+import org.eclipse.paho.client.mqttv3.MqttCallback
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
import scala.collection.Map
import scala.collection.mutable.HashMap
@@ -50,9 +50,7 @@ private[streaming] class MQTTInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
-
storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging {
-
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
@@ -62,50 +60,43 @@ private[streaming] class MQTTInputDStream[T: ClassManifest](
private[streaming] class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
-
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
def onStop() {
blockGenerator.stop()
}
-
def onStart() {
blockGenerator.start()
- //Set up persistence for messages
- var peristance:MqttClientPersistence =new MemoryPersistence();
+ // Set up persistence for messages
+ var peristance: MqttClientPersistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
- //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance);
+ // Connect to MqttBroker
+ client.connect()
- //Connect to MqttBroker
- client.connect();
-
- //Subscribe to Mqtt topic
- client.subscribe(topic);
-
- //Callback automatically triggers as and when new message arrives on specified topic
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
+ // Callback automatically triggers as and when new message arrives on specified topic
var callback: MqttCallback = new MqttCallback() {
- //Handles Mqtt message
+ // Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
blockGenerator += new String(arg1.getPayload())
}
-
+
override def deliveryComplete(arg0: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
- System.err.println("Connection lost " + arg0)
-
+ logInfo("Connection lost " + arg0)
}
+ }
- };
-
- //Set up callback for MqttClient
- client.setCallback(callback);
-
+ // Set up callback for MqttClient
+ client.setCallback(callback)
}
-
}