aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src/test
diff options
context:
space:
mode:
authorprabs <prabsmails@gmail.com>2015-02-25 14:37:35 +0000
committerSean Owen <sowen@cloudera.com>2015-02-25 14:37:35 +0000
commitd51ed263ee791967380de6b9c892985ce87f6fcb (patch)
treeb02c8d420ac6da9590a859b024ac3f82422d4f8b /external/mqtt/src/test
parentd641fbb39c90b1d734cc55396ca43d7e98788975 (diff)
downloadspark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.gz
spark-d51ed263ee791967380de6b9c892985ce87f6fcb.tar.bz2
spark-d51ed263ee791967380de6b9c892985ce87f6fcb.zip
[SPARK-5666][streaming][MQTT streaming] some trivial fixes
modified to adhere to accepted coding standards as pointed by tdas in PR #3844 Author: prabs <prabsmails@gmail.com> Author: Prabeesh K <prabsmails@gmail.com> Closes #4178 from prabeesh/master and squashes the following commits: bd2cb49 [Prabeesh K] adress the comment ccc0765 [prabs] adress the comment 46f9619 [prabs] adress the comment c035bdc [prabs] adress the comment 22dd7f7 [prabs] address the comments 0cc67bd [prabs] adress the comment 838c38e [prabs] adress the comment cd57029 [prabs] address the comments 66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence 5857989 [prabs] modified to adhere to accepted coding standards
Diffstat (limited to 'external/mqtt/src/test')
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala12
1 files changed, 6 insertions, 6 deletions
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 19c9271af7..0f3298af62 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -42,8 +42,8 @@ import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
private val batchDuration = Milliseconds(500)
- private val master: String = "local[2]"
- private val framework: String = this.getClass.getSimpleName
+ private val master = "local[2]"
+ private val framework = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
@@ -69,7 +69,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
- val receiveStream: ReceiverInputDStream[String] =
+ val receiveStream =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
@@ -123,12 +123,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
def publishData(data: String): Unit = {
var client: MqttClient = null
try {
- val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+ val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
- val msgTopic: MqttTopic = client.getTopic(topic)
- val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+ val msgTopic = client.getTopic(topic)
+ val message = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)