aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala15
1 files changed, 11 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
index 9b3fe67e6a..ac0528213d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@ -46,24 +46,31 @@ import scala.collection.JavaConversions._
* @param storageLevel RDD storage level.
*/
-private[streaming] class MQTTInputDStream[T: ClassManifest](
+private[streaming]
+class MQTTInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
- storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging {
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) with Logging {
+
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
-private[streaming] class MQTTReceiver(brokerUrl: String,
+private[streaming]
+class MQTTReceiver(brokerUrl: String,
topic: String,
- storageLevel: StorageLevel) extends NetworkReceiver[Any] {
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
def onStop() {
blockGenerator.stop()
}
+
def onStart() {
blockGenerator.start()