aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index be6587b316..7d06505df7 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -30,7 +30,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
/**
- * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming"
+ * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
+ * Space separated String Message "hello mqtt demo for spark streaming"
*/
object MQTTPublisher {
@@ -99,13 +100,13 @@ object MQTTWordCount {
val Seq(master, brokerUrl, topic) = args.toSeq
- val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
+ Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(x => x.toString.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
-
ssc.start()
}
}