aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorprabs <prabsmails@gmail.com>2015-02-25 14:37:35 +0000
committerSean Owen <sowen@cloudera.com>2015-02-25 14:37:35 +0000
commitd51ed263ee791967380de6b9c892985ce87f6fcb (patch)
treeb02c8d420ac6da9590a859b024ac3f82422d4f8b /external
parentd641fbb39c90b1d734cc55396ca43d7e98788975 (diff)
downloadspark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.gz
spark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.bz2
spark-d51ed263ee791967380de6b9c892985ce87f6fcb.zip
[SPARK-5666][streaming][MQTT streaming] some trivial fixes
modified to adhere to accepted coding standards as pointed by tdas in PR #3844 Author: prabs <prabsmails@gmail.com> Author: Prabeesh K <prabsmails@gmail.com> Closes #4178 from prabeesh/master and squashes the following commits: bd2cb49 [Prabeesh K] adress the comment ccc0765 [prabs] adress the comment 46f9619 [prabs] adress the comment c035bdc [prabs] adress the comment 22dd7f7 [prabs] address the comments 0cc67bd [prabs] adress the comment 838c38e [prabs] adress the comment cd57029 [prabs] address the comments 66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence 5857989 [prabs] modified to adhere to accepted coding standards
Diffstat (limited to 'external')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala26
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala3
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala12
3 files changed, 21 insertions, 20 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1ef91dd492..3c0ef94cb0 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -17,23 +17,23 @@
package org.apache.spark.streaming.mqtt
+import java.io.IOException
+import java.util.concurrent.Executors
+import java.util.Properties
+
+import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import java.util.Properties
-import java.util.concurrent.Executors
-import java.io.IOException
-
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
@@ -82,18 +82,18 @@ class MQTTReceiver(
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Callback automatically triggers as and when new message arrives on specified topic
- val callback: MqttCallback = new MqttCallback() {
+ val callback = new MqttCallback() {
// Handles Mqtt message
- override def messageArrived(arg0: String, arg1: MqttMessage) {
- store(new String(arg1.getPayload(),"utf-8"))
+ override def messageArrived(topic: String, message: MqttMessage) {
+ store(new String(message.getPayload(),"utf-8"))
}
- override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ override def deliveryComplete(token: IMqttDeliveryToken) {
}
- override def connectionLost(arg0: Throwable) {
- restart("Connection lost ", arg0)
+ override def connectionLost(cause: Throwable) {
+ restart("Connection lost ", cause)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index c5ffe51f99..1142d0f56b 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -17,10 +17,11 @@
package org.apache.spark.streaming.mqtt
+import scala.reflect.ClassTag
+
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
-import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 19c9271af7..0f3298af62 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -42,8 +42,8 @@ import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
private val batchDuration = Milliseconds(500)
- private val master: String = "local[2]"
- private val framework: String = this.getClass.getSimpleName
+ private val master = "local[2]"
+ private val framework = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
@@ -69,7 +69,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
- val receiveStream: ReceiverInputDStream[String] =
+ val receiveStream =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
@@ -123,12 +123,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
def publishData(data: String): Unit = {
var client: MqttClient = null
try {
- val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+ val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
- val msgTopic: MqttTopic = client.getTopic(topic)
- val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+ val msgTopic = client.getTopic(topic)
+ val message = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)