diff options
author | Sandeep <sandeep@techaddict.me> | 2014-04-10 15:04:13 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-10 15:04:13 -0700 |
commit | 930b70f0523e96fe01c1317ef7fad1b76b36d4d9 (patch) | |
tree | fba70b8897f6c5ae1123e4717d8efdb4d4b0acc4 /external/mqtt/src | |
parent | f0466625200842f3cc486e9aa1caa417586be533 (diff) | |
download | spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.tar.gz spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.tar.bz2 spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.zip |
Remove Unnecessary Whitespace's
stack these together in a commit else they show up chunk by chunk in different commits.
Author: Sandeep <sandeep@techaddict.me>
Closes #380 from techaddict/white_space and squashes the following commits:
b58f294 [Sandeep] Remove Unnecessary Whitespace's
Diffstat (limited to 'external/mqtt/src')
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 41e813d48c..1204cfba39 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._ * @param storageLevel RDD storage level. */ -private[streaming] +private[streaming] class MQTTInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) with Logging { - + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] } } -private[streaming] +private[streaming] class MQTTReceiver(brokerUrl: String, topic: String, storageLevel: StorageLevel ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) - + def onStop() { blockGenerator.stop() } - + def onStart() { blockGenerator.start() - // Set up persistence for messages + // Set up persistence for messages var peristance: MqttClientPersistence = new MemoryPersistence() // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) - // Connect to MqttBroker + // Connect to MqttBroker client.connect() // Subscribe to Mqtt topic @@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String, // Callback automatically triggers as and when new message arrives on specified topic var callback: MqttCallback = new MqttCallback() { - // Handles Mqtt message + // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { blockGenerator += new String(arg1.getPayload()) } |