aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorprabs <prabsmails@gmail.com>2015-02-25 14:37:35 +0000
committerSean Owen <sowen@cloudera.com>2015-02-25 14:37:35 +0000
commitd51ed263ee791967380de6b9c892985ce87f6fcb (patch)
treeb02c8d420ac6da9590a859b024ac3f82422d4f8b /examples
parentd641fbb39c90b1d734cc55396ca43d7e98788975 (diff)
downloadspark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.gz
spark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.bz2
spark-d51ed263ee791967380de6b9c892985ce87f6fcb.zip
[SPARK-5666][streaming][MQTT streaming] some trivial fixes
modified to adhere to accepted coding standards as pointed by tdas in PR #3844 Author: prabs <prabsmails@gmail.com> Author: Prabeesh K <prabsmails@gmail.com> Closes #4178 from prabeesh/master and squashes the following commits: bd2cb49 [Prabeesh K] adress the comment ccc0765 [prabs] adress the comment 46f9619 [prabs] adress the comment c035bdc [prabs] adress the comment 22dd7f7 [prabs] address the comments 0cc67bd [prabs] adress the comment 838c38e [prabs] adress the comment cd57029 [prabs] address the comments 66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence 5857989 [prabs] modified to adhere to accepted coding standards
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala49
1 files changed, 29 insertions, 20 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 6ff0c47793..f40caad322 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -17,8 +17,8 @@
package org.apache.spark.examples.streaming
-import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -31,8 +31,6 @@ import org.apache.spark.SparkConf
*/
object MQTTPublisher {
- var client: MqttClient = _
-
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
@@ -42,25 +40,36 @@ object MQTTPublisher {
StreamingExamples.setStreamingLogLevels()
val Seq(brokerUrl, topic) = args.toSeq
+
+ var client: MqttClient = null
try {
- var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
- client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val persistence = new MemoryPersistence()
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
+
+ client.connect()
+
+ val msgtopic = client.getTopic(topic)
+ val msgContent = "hello mqtt demo for spark streaming"
+ val message = new MqttMessage(msgContent.getBytes("utf-8"))
+
+ while (true) {
+ try {
+ msgtopic.publish(message)
+ println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
+ } catch {
+ case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+ Thread.sleep(10)
+ println("Queue is full, wait for to consume data from the message queue")
+ }
+ }
} catch {
case e: MqttException => println("Exception Caught: " + e)
+ } finally {
+ if (client != null) {
+ client.disconnect()
+ }
}
-
- client.connect()
-
- val msgtopic: MqttTopic = client.getTopic(topic)
- val msg: String = "hello mqtt demo for spark streaming"
-
- while (true) {
- val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
- msgtopic.publish(message)
- println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
- }
- client.disconnect()
}
}
@@ -96,9 +105,9 @@ object MQTTWordCount {
val sparkConf = new SparkConf().setAppName("MQTTWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
-
- val words = lines.flatMap(x => x.toString.split(" "))
+ val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
wordCounts.print()
ssc.start()
ssc.awaitTermination()