aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrabeesh K <prabsmails@gmail.com>2013-10-18 09:09:49 +0530
committerPrabeesh K <prabsmails@gmail.com>2013-10-18 09:09:49 +0530
commitd223d38933b440df372dce38c6f4181586011c9e (patch)
treefdc2e005f9cadf15e311ed82a69db43ce7a56d4f /streaming
parent890f8fe4393a20749e0a6cfd57ff07f60cfad2a1 (diff)
downloadspark-d223d38933b440df372dce38c6f4181586011c9e.tar.gz
spark-d223d38933b440df372dce38c6f4181586011c9e.tar.bz2
spark-d223d38933b440df372dce38c6f4181586011c9e.zip
Update MQTTInputDStream.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala15
1 files changed, 11 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
index 9b3fe67e6a..ac0528213d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@ -46,24 +46,31 @@ import scala.collection.JavaConversions._
* @param storageLevel RDD storage level.
*/
-private[streaming] class MQTTInputDStream[T: ClassManifest](
+private[streaming]
+class MQTTInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
- storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging {
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) with Logging {
+
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
-private[streaming] class MQTTReceiver(brokerUrl: String,
+private[streaming]
+class MQTTReceiver(brokerUrl: String,
topic: String,
- storageLevel: StorageLevel) extends NetworkReceiver[Any] {
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
def onStop() {
blockGenerator.stop()
}
+
def onStart() {
blockGenerator.start()