diff options
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 9b3fe67e6a..ac0528213d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -46,24 +46,31 @@ import scala.collection.JavaConversions._ * @param storageLevel RDD storage level. */ -private[streaming] class MQTTInputDStream[T: ClassManifest]( +private[streaming] +class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } -private[streaming] class MQTTReceiver(brokerUrl: String, +private[streaming] +class MQTTReceiver(brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkReceiver[Any] { + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) + def onStop() { blockGenerator.stop() } + def onStart() { blockGenerator.start() |