aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala49
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala26
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala3
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala12
4 files changed, 50 insertions, 40 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 6ff0c47793..f40caad322 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -17,8 +17,8 @@
package org.apache.spark.examples.streaming
-import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -31,8 +31,6 @@ import org.apache.spark.SparkConf
*/
object MQTTPublisher {
- var client: MqttClient = _
-
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
@@ -42,25 +40,36 @@ object MQTTPublisher {
StreamingExamples.setStreamingLogLevels()
val Seq(brokerUrl, topic) = args.toSeq
+
+ var client: MqttClient = null
try {
- var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
- client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val persistence = new MemoryPersistence()
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
+
+ client.connect()
+
+ val msgtopic = client.getTopic(topic)
+ val msgContent = "hello mqtt demo for spark streaming"
+ val message = new MqttMessage(msgContent.getBytes("utf-8"))
+
+ while (true) {
+ try {
+ msgtopic.publish(message)
+ println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
+ } catch {
+ case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+ Thread.sleep(10)
+ println("Queue is full, wait for to consume data from the message queue")
+ }
+ }
} catch {
case e: MqttException => println("Exception Caught: " + e)
+ } finally {
+ if (client != null) {
+ client.disconnect()
+ }
}
-
- client.connect()
-
- val msgtopic: MqttTopic = client.getTopic(topic)
- val msg: String = "hello mqtt demo for spark streaming"
-
- while (true) {
- val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
- msgtopic.publish(message)
- println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
- }
- client.disconnect()
}
}
@@ -96,9 +105,9 @@ object MQTTWordCount {
val sparkConf = new SparkConf().setAppName("MQTTWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
-
- val words = lines.flatMap(x => x.toString.split(" "))
+ val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
wordCounts.print()
ssc.start()
ssc.awaitTermination()
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1ef91dd492..3c0ef94cb0 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -17,23 +17,23 @@
package org.apache.spark.streaming.mqtt
+import java.io.IOException
+import java.util.concurrent.Executors
+import java.util.Properties
+
+import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
@@ -82,18 +82,18 @@ class MQTTReceiver(
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Callback automatically triggers as and when new message arrives on specified topic
- val callback: MqttCallback = new MqttCallback() {
+ val callback = new MqttCallback() {
// Handles Mqtt message
- override def messageArrived(arg0: String, arg1: MqttMessage) {
- store(new String(arg1.getPayload(),"utf-8"))
+ override def messageArrived(topic: String, message: MqttMessage) {
+ store(new String(message.getPayload(),"utf-8"))
}
- override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ override def deliveryComplete(token: IMqttDeliveryToken) {
}
- override def connectionLost(arg0: Throwable) {
- restart("Connection lost ", arg0)
+ override def connectionLost(cause: Throwable) {
+ restart("Connection lost ", cause)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index c5ffe51f99..1142d0f56b 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -17,10 +17,11 @@
package org.apache.spark.streaming.mqtt
+import scala.reflect.ClassTag
+
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
-import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 19c9271af7..0f3298af62 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -42,8 +42,8 @@ import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
private val batchDuration = Milliseconds(500)
- private val master: String = "local[2]"
- private val framework: String = this.getClass.getSimpleName
+ private val master = "local[2]"
+ private val framework = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
@@ -69,7 +69,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
- val receiveStream: ReceiverInputDStream[String] =
+ val receiveStream =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
@@ -123,12 +123,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
def publishData(data: String): Unit = {
var client: MqttClient = null
try {
- val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+ val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
- val msgTopic: MqttTopic = client.getTopic(topic)
- val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+ val msgTopic = client.getTopic(topic)
+ val message = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)