diff options
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705..8ed5dfb99b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -450,6 +450,21 @@ class StreamingContext private ( inputStream } +/** + * Create an input stream that receives messages pushed by a mqtt publisher. + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { + val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) + registerInputStream(inputStream) + inputStream + } /** * Create a unified DStream from multiple DStreams of the same type and same interval */ |