aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorprabeesh <prabsmails@gmail.com>2013-10-16 13:36:25 +0530
committerprabeesh <prabsmails@gmail.com>2013-10-16 13:36:25 +0530
commit2e48b23eae6cfc5e7c825573e2739e54c4569045 (patch)
treeed852a8c2c0423e199dc7e5d7dd2d4b8370d4b09 /streaming
parent742ada91e0415726b865029b1c9e9e1d6ab2ecb8 (diff)
downloadspark-2e48b23eae6cfc5e7c825573e2739e54c4569045.tar.gz
spark-2e48b23eae6cfc5e7c825573e2739e54c4569045.tar.bz2
spark-2e48b23eae6cfc5e7c825573e2739e54c4569045.zip
added mqtt adapter
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
1 files changed, 15 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705..8ed5dfb99b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -450,6 +450,21 @@ class StreamingContext private (
inputStream
}
+/**
+ * Create an input stream that receives messages pushed by a mqtt publisher.
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
*/