From dbafa11396d7c1619f5523fba5ae6abed07e90d9 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Tue, 22 Oct 2013 08:50:34 +0530 Subject: Update MQTTWordCount.scala --- .../scala/org/apache/spark/streaming/examples/MQTTWordCount.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'examples/src/main/scala') diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index be6587b316..7d06505df7 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -30,7 +30,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic /** - * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes + * Space separated String Message "hello mqtt demo for spark streaming" */ object MQTTPublisher { @@ -99,13 +100,13 @@ object MQTTWordCount { val Seq(master, brokerUrl, topic) = args.toSeq - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), + Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() - ssc.start() } } -- cgit v1.2.3