aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt
diff options
context:
space:
mode:
authorSandeep <sandeep@techaddict.me>2014-04-10 15:04:13 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-10 15:04:13 -0700
commit930b70f0523e96fe01c1317ef7fad1b76b36d4d9 (patch)
treefba70b8897f6c5ae1123e4717d8efdb4d4b0acc4 /external/mqtt
parentf0466625200842f3cc486e9aa1caa417586be533 (diff)
downloadspark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.tar.gz
spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.tar.bz2
spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.zip
Remove Unnecessary Whitespace's
stack these together in a commit else they show up chunk by chunk in different commits. Author: Sandeep <sandeep@techaddict.me> Closes #380 from techaddict/white_space and squashes the following commits: b58f294 [Sandeep] Remove Unnecessary Whitespace's
Diffstat (limited to 'external/mqtt')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 41e813d48c..1204cfba39 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._
* @param storageLevel RDD storage level.
*/
-private[streaming]
+private[streaming]
class MQTTInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
-
+
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
+
def onStop() {
blockGenerator.stop()
}
-
+
def onStart() {
blockGenerator.start()
- // Set up persistence for messages
+ // Set up persistence for messages
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
- // Connect to MqttBroker
+ // Connect to MqttBroker
client.connect()
// Subscribe to Mqtt topic
@@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String,
// Callback automatically triggers as and when new message arrives on specified topic
var callback: MqttCallback = new MqttCallback() {
- // Handles Mqtt message
+ // Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
blockGenerator += new String(arg1.getPayload())
}