diff options
author | Prabeesh K <prabsmails@gmail.com> | 2013-10-18 17:00:28 +0530 |
---|---|---|
committer | Prabeesh K <prabsmails@gmail.com> | 2013-10-18 17:00:28 +0530 |
commit | 6ec39829e9204c742e364d48c23e106625bba17d (patch) | |
tree | dc7bbd9a34ead6f2db33db79f2ca616e4f2478ea /examples/src | |
parent | d223d38933b440df372dce38c6f4181586011c9e (diff) | |
download | spark-6ec39829e9204c742e364d48c23e106625bba17d.tar.gz spark-6ec39829e9204c742e364d48c23e106625bba17d.tar.bz2 spark-6ec39829e9204c742e364d48c23e106625bba17d.zip |
Update MQTTWordCount.scala
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 04e21bef5e..be6587b316 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -22,12 +22,12 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.MQTTReceiver import org.apache.spark.storage.StorageLevel -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" @@ -47,24 +47,24 @@ object MQTTPublisher { val Seq(brokerUrl, topic) = args.toSeq try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp"); - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance); + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) } catch { - case e: MqttException => println("Exception Caught: " + e); + case e: MqttException => println("Exception Caught: " + e) } - client.connect(); + client.connect() val msgtopic: MqttTopic = client.getTopic(topic); - val msg: String = "hello mqtt demo for spark streaming"; + val msg: String = "hello mqtt demo for spark streaming" while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()); + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) msgtopic.publish(message); - println("Published data. topic: " + msgtopic.getName() + " Message: " + message); + println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } - client.disconnect(); + client.disconnect() } } @@ -109,4 +109,3 @@ object MQTTWordCount { ssc.start() } } - |