diff options
author | prabeesh <prabsmails@gmail.com> | 2013-10-16 13:36:25 +0530 |
---|---|---|
committer | prabeesh <prabsmails@gmail.com> | 2013-10-16 13:36:25 +0530 |
commit | 2e48b23eae6cfc5e7c825573e2739e54c4569045 (patch) | |
tree | ed852a8c2c0423e199dc7e5d7dd2d4b8370d4b09 | |
parent | 742ada91e0415726b865029b1c9e9e1d6ab2ecb8 (diff) | |
download | spark-2e48b23eae6cfc5e7c825573e2739e54c4569045.tar.gz spark-2e48b23eae6cfc5e7c825573e2739e54c4569045.tar.bz2 spark-2e48b23eae6cfc5e7c825573e2739e54c4569045.zip |
added mqtt adapter
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705..8ed5dfb99b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -450,6 +450,21 @@ class StreamingContext private ( inputStream } +/** + * Create an input stream that receives messages pushed by a mqtt publisher. + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { + val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) + registerInputStream(inputStream) + inputStream + } /** * Create a unified DStream from multiple DStreams of the same type and same interval */ |