aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala49
1 files changed, 29 insertions, 20 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 6ff0c47793..f40caad322 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -17,8 +17,8 @@
package org.apache.spark.examples.streaming
-import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -31,8 +31,6 @@ import org.apache.spark.SparkConf
*/
object MQTTPublisher {
- var client: MqttClient = _
-
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
@@ -42,25 +40,36 @@ object MQTTPublisher {
StreamingExamples.setStreamingLogLevels()
val Seq(brokerUrl, topic) = args.toSeq
+
+ var client: MqttClient = null
try {
- var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
- client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val persistence = new MemoryPersistence()
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
+
+ client.connect()
+
+ val msgtopic = client.getTopic(topic)
+ val msgContent = "hello mqtt demo for spark streaming"
+ val message = new MqttMessage(msgContent.getBytes("utf-8"))
+
+ while (true) {
+ try {
+ msgtopic.publish(message)
+ println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
+ } catch {
+ case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+ Thread.sleep(10)
+ println("Queue is full, wait for to consume data from the message queue")
+ }
+ }
} catch {
case e: MqttException => println("Exception Caught: " + e)
+ } finally {
+ if (client != null) {
+ client.disconnect()
+ }
}
-
- client.connect()
-
- val msgtopic: MqttTopic = client.getTopic(topic)
- val msg: String = "hello mqtt demo for spark streaming"
-
- while (true) {
- val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
- msgtopic.publish(message)
- println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
- }
- client.disconnect()
}
}
@@ -96,9 +105,9 @@ object MQTTWordCount {
val sparkConf = new SparkConf().setAppName("MQTTWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
-
- val words = lines.flatMap(x => x.toString.split(" "))
+ val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
wordCounts.print()
ssc.start()
ssc.awaitTermination()