aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt
diff options
context:
space:
mode:
authorbilna <bilnap@am.amrita.edu>2015-01-09 14:45:28 -0800
committerAndrew Or <andrew@databricks.com>2015-01-09 14:45:28 -0800
commit4e1f12d997426560226648d62ee17c90352613e7 (patch)
tree9de747de0e6f0be2257a2a3f56604e2334379cc7 /external/mqtt
parentae628725abce9ffe34b9a7110d5ac51a076454aa (diff)
downloadspark-4e1f12d997426560226648d62ee17c90352613e7.tar.gz
spark-4e1f12d997426560226648d62ee17c90352613e7.tar.bz2
spark-4e1f12d997426560226648d62ee17c90352613e7.zip
[Minor] Fix import order and other coding style
fixed import order and other coding style Author: bilna <bilnap@am.amrita.edu> Author: Bilna P <bilna.p@gmail.com> Closes #3966 from Bilna/master and squashes the following commits: 5e76f04 [bilna] fix import order and other coding style 5718d66 [bilna] Merge remote-tracking branch 'upstream/master' ae56514 [bilna] Merge remote-tracking branch 'upstream/master' acea3a3 [bilna] Adding dependency with scope test 28681fa [bilna] Merge remote-tracking branch 'upstream/master' fac3904 [bilna] Correction in Indentation and coding style ed9db4c [bilna] Merge remote-tracking branch 'upstream/master' 4b34ee7 [Bilna P] Update MQTTStreamSuite.scala 04503cf [bilna] Added embedded broker service for mqtt test 89d804e [bilna] Merge remote-tracking branch 'upstream/master' fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master' 4b58094 [Bilna P] Update MQTTStreamSuite.scala b1ac4ad [bilna] Added BeforeAndAfter 5f6bfd2 [bilna] Added BeforeAndAfter e8b6623 [Bilna P] Update MQTTStreamSuite.scala 5ca6691 [Bilna P] Update MQTTStreamSuite.scala 8616495 [bilna] [SPARK-4631] unit test for MQTT
Diffstat (limited to 'external/mqtt')
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala17
1 files changed, 11 insertions, 6 deletions
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 98fe6cb301..39eb8b1834 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -19,16 +19,19 @@ package org.apache.spark.streaming.mqtt
import java.net.{URI, ServerSocket}
+import scala.concurrent.duration._
+
import org.apache.activemq.broker.{TransportConnector, BrokerService}
-import org.apache.spark.util.Utils
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually
-import scala.concurrent.duration._
+
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
@@ -38,8 +41,9 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
- private var ssc: StreamingContext = _
private val persistenceDir = Utils.createTempDir()
+
+ private var ssc: StreamingContext = _
private var broker: BrokerService = _
private var connector: TransportConnector = _
@@ -115,8 +119,9 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
- for (i <- 0 to 100)
+ for (i <- 0 to 100) {
msgTopic.publish(message)
+ }
}
} finally {
client.disconnect()