diff options
author | prabs <prabsmails@gmail.com> | 2015-02-25 14:37:35 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-02-25 14:37:35 +0000 |
commit | d51ed263ee791967380de6b9c892985ce87f6fcb (patch) | |
tree | b02c8d420ac6da9590a859b024ac3f82422d4f8b | |
parent | d641fbb39c90b1d734cc55396ca43d7e98788975 (diff) | |
download | spark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.gz spark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.bz2 spark-d51ed263ee791967380de6b9c892985ce87f6fcb.zip |
[SPARK-5666][streaming][MQTT streaming] some trivial fixes
modified to adhere to accepted coding standards as pointed by tdas in PR #3844
Author: prabs <prabsmails@gmail.com>
Author: Prabeesh K <prabsmails@gmail.com>
Closes #4178 from prabeesh/master and squashes the following commits:
bd2cb49 [Prabeesh K] adress the comment
ccc0765 [prabs] adress the comment
46f9619 [prabs] adress the comment
c035bdc [prabs] adress the comment
22dd7f7 [prabs] address the comments
0cc67bd [prabs] adress the comment
838c38e [prabs] adress the comment
cd57029 [prabs] address the comments
66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence
5857989 [prabs] modified to adhere to accepted coding standards
4 files changed, 50 insertions, 40 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 6ff0c47793..f40caad322 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -17,8 +17,8 @@ package org.apache.spark.examples.streaming -import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -31,8 +31,6 @@ import org.apache.spark.SparkConf */ object MQTTPublisher { - var client: MqttClient = _ - def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>") @@ -42,25 +40,36 @@ object MQTTPublisher { StreamingExamples.setStreamingLogLevels() val Seq(brokerUrl, topic) = args.toSeq + + var client: MqttClient = null try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) + val persistence = new MemoryPersistence() + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) + + client.connect() + + val msgtopic = client.getTopic(topic) + val msgContent = "hello mqtt demo for spark streaming" + val message = new MqttMessage(msgContent.getBytes("utf-8")) + + while (true) { + try { + msgtopic.publish(message) + println(s"Published data. topic: {msgtopic.getName()}; Message: {message}") + } catch { + case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => + Thread.sleep(10) + println("Queue is full, wait for to consume data from the message queue") + } + } } catch { case e: MqttException => println("Exception Caught: " + e) + } finally { + if (client != null) { + client.disconnect() + } } - - client.connect() - - val msgtopic: MqttTopic = client.getTopic(topic) - val msg: String = "hello mqtt demo for spark streaming" - - while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8")) - msgtopic.publish(message) - println("Published data. topic: " + msgtopic.getName() + " Message: " + message) - } - client.disconnect() } } @@ -96,9 +105,9 @@ object MQTTWordCount { val sparkConf = new SparkConf().setAppName("MQTTWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) - - val words = lines.flatMap(x => x.toString.split(" ")) + val words = lines.flatMap(x => x.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() ssc.start() ssc.awaitTermination() diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 1ef91dd492..3c0ef94cb0 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -17,23 +17,23 @@ package org.apache.spark.streaming.mqtt +import java.io.IOException +import java.util.concurrent.Executors +import java.util.Properties + +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import java.util.Properties -import java.util.concurrent.Executors -import java.io.IOException - +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.MqttCallback import org.eclipse.paho.client.mqttv3.MqttClient import org.eclipse.paho.client.mqttv3.MqttClientPersistence -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel @@ -82,18 +82,18 @@ class MQTTReceiver( val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) // Callback automatically triggers as and when new message arrives on specified topic - val callback: MqttCallback = new MqttCallback() { + val callback = new MqttCallback() { // Handles Mqtt message - override def messageArrived(arg0: String, arg1: MqttMessage) { - store(new String(arg1.getPayload(),"utf-8")) + override def messageArrived(topic: String, message: MqttMessage) { + store(new String(message.getPayload(),"utf-8")) } - override def deliveryComplete(arg0: IMqttDeliveryToken) { + override def deliveryComplete(token: IMqttDeliveryToken) { } - override def connectionLost(arg0: Throwable) { - restart("Connection lost ", arg0) + override def connectionLost(cause: Throwable) { + restart("Connection lost ", cause) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index c5ffe51f99..1142d0f56b 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -17,10 +17,11 @@ package org.apache.spark.streaming.mqtt +import scala.reflect.ClassTag + import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} -import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object MQTTUtils { diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 19c9271af7..0f3298af62 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -42,8 +42,8 @@ import org.apache.spark.util.Utils class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { private val batchDuration = Milliseconds(500) - private val master: String = "local[2]" - private val framework: String = this.getClass.getSimpleName + private val master = "local[2]" + private val framework = this.getClass.getSimpleName private val freePort = findFreePort() private val brokerUri = "//localhost:" + freePort private val topic = "def" @@ -69,7 +69,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { test("mqtt input stream") { val sendMessage = "MQTT demo for spark streaming" - val receiveStream: ReceiverInputDStream[String] = + val receiveStream = MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY) @volatile var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => @@ -123,12 +123,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { def publishData(data: String): Unit = { var client: MqttClient = null try { - val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence) client.connect() if (client.isConnected) { - val msgTopic: MqttTopic = client.getTopic(topic) - val message: MqttMessage = new MqttMessage(data.getBytes("utf-8")) + val msgTopic = client.getTopic(topic) + val message = new MqttMessage(data.getBytes("utf-8")) message.setQos(1) message.setRetained(true) |