aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorPrabeesh K <prabsmails@gmail.com>2013-10-18 17:00:28 +0530
committerPrabeesh K <prabsmails@gmail.com>2013-10-18 17:00:28 +0530
commit6ec39829e9204c742e364d48c23e106625bba17d (patch)
treedc7bbd9a34ead6f2db33db79f2ca616e4f2478ea /examples
parentd223d38933b440df372dce38c6f4181586011c9e (diff)
downloadspark-6ec39829e9204c742e364d48c23e106625bba17d.tar.gz
spark-6ec39829e9204c742e364d48c23e106625bba17d.tar.bz2
spark-6ec39829e9204c742e364d48c23e106625bba17d.zip
Update MQTTWordCount.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala29
1 files changed, 14 insertions, 15 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 04e21bef5e..be6587b316 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -22,12 +22,12 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.MQTTReceiver
import org.apache.spark.storage.StorageLevel
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttTopic;
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
/**
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming"
@@ -47,24 +47,24 @@ object MQTTPublisher {
val Seq(brokerUrl, topic) = args.toSeq
try {
- var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp");
- client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance);
+ var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
} catch {
- case e: MqttException => println("Exception Caught: " + e);
+ case e: MqttException => println("Exception Caught: " + e)
}
- client.connect();
+ client.connect()
val msgtopic: MqttTopic = client.getTopic(topic);
- val msg: String = "hello mqtt demo for spark streaming";
+ val msg: String = "hello mqtt demo for spark streaming"
while (true) {
- val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes());
+ val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
msgtopic.publish(message);
- println("Published data. topic: " + msgtopic.getName() + " Message: " + message);
+ println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
- client.disconnect();
+ client.disconnect()
}
}
@@ -109,4 +109,3 @@ object MQTTWordCount {
ssc.start()
}
}
-