diff options
author | Prabeesh K <prabsmails@gmail.com> | 2013-10-18 09:09:49 +0530 |
---|---|---|
committer | Prabeesh K <prabsmails@gmail.com> | 2013-10-18 09:09:49 +0530 |
commit | d223d38933b440df372dce38c6f4181586011c9e (patch) | |
tree | fdc2e005f9cadf15e311ed82a69db43ce7a56d4f /streaming | |
parent | 890f8fe4393a20749e0a6cfd57ff07f60cfad2a1 (diff) | |
download | spark-d223d38933b440df372dce38c6f4181586011c9e.tar.gz spark-d223d38933b440df372dce38c6f4181586011c9e.tar.bz2 spark-d223d38933b440df372dce38c6f4181586011c9e.zip |
Update MQTTInputDStream.scala
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 9b3fe67e6a..ac0528213d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -46,24 +46,31 @@ import scala.collection.JavaConversions._ * @param storageLevel RDD storage level. */ -private[streaming] class MQTTInputDStream[T: ClassManifest]( +private[streaming] +class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } -private[streaming] class MQTTReceiver(brokerUrl: String, +private[streaming] +class MQTTReceiver(brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkReceiver[Any] { + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) + def onStop() { blockGenerator.stop() } + def onStart() { blockGenerator.start() |