aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-23 15:07:59 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-23 15:07:59 -0700
commitdd659642e77dca9a123178f98e399720127454c2 (patch)
tree951ebf5620bc4a20890d77abef9e087a330fe088
parent452aa36d671d3fdf53058e80cbd86787c8870cd7 (diff)
parent9ca1bd95305a904637075e4f5747b28571114fb1 (diff)
downloadspark-dd659642e77dca9a123178f98e399720127454c2.tar.gz
spark-dd659642e77dca9a123178f98e399720127454c2.tar.bz2
spark-dd659642e77dca9a123178f98e399720127454c2.zip
Merge pull request #64 from prabeesh/master
MQTT Adapter for Spark Streaming MQTT is a machine-to-machine (M2M)/Internet of Things connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. You may read more about it here http://mqtt.org/ Message Queue Telemetry Transport (MQTT) is an open message protocol for M2M communications. It enables the transfer of telemetry-style data in the form of messages from devices like sensors and actuators, to mobile phones, embedded systems on vehicles, or laptops and full scale computers. The protocol was invented by Andy Stanford-Clark of IBM, and Arlen Nipper of Cirrus Link Solutions This protocol enables a publish/subscribe messaging model in an extremely lightweight way. It is useful for connections with remote locations where line of code and network bandwidth is a constraint. MQTT is one of the widely used protocol for 'Internet of Things'. This protocol is getting much attraction as anything and everything is getting connected to internet and they all produce data. Researchers and companies predict some 25 billion devices will be connected to the internet by 2015. Plugin/Support for MQTT is available in popular MQs like RabbitMQ, ActiveMQ etc. Support for MQTT in Spark will help people with Internet of Things (IoT) projects to use Spark Streaming for their real time data processing needs (from sensors and other embedded devices etc).
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala107
-rw-r--r--project/SparkBuild.scala6
-rw-r--r--streaming/pom.xml5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala109
5 files changed, 241 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
new file mode 100644
index 0000000000..af698a01d5
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.streaming.{ Seconds, StreamingContext }
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.MQTTReceiver
+import org.apache.spark.storage.StorageLevel
+
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+/**
+ * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
+ * Space separated String Message "hello mqtt demo for spark streaming"
+ */
+object MQTTPublisher {
+
+ var client: MqttClient = _
+
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
+ System.exit(1)
+ }
+
+ val Seq(brokerUrl, topic) = args.toSeq
+
+ try {
+ var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ } catch {
+ case e: MqttException => println("Exception Caught: " + e)
+ }
+
+ client.connect()
+
+ val msgtopic: MqttTopic = client.getTopic(topic);
+ val msg: String = "hello mqtt demo for spark streaming"
+
+ while (true) {
+ val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
+ msgtopic.publish(message);
+ println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
+ }
+ client.disconnect()
+ }
+}
+
+/**
+ * A sample wordcount with MqttStream stream
+ *
+ * To work with Mqtt, Mqtt Message broker/server required.
+ * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
+ * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
+ * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
+ * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient
+ * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
+ * and run the example as
+ * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
+ */
+object MQTTWordCount {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" +
+ " In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, brokerUrl, topic) = args.toSeq
+
+ val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
+ Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
+
+ val words = lines.flatMap(x => x.toString.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 17f480e3f0..20f2c018fa 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -108,7 +108,10 @@ object SparkBuild extends Build {
// Shared between both core and streaming.
resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
- // For Sonatype publishing
+ // Shared between both examples and streaming.
+ resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"),
+
+ // For Sonatype publishing
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
@@ -282,6 +285,7 @@ object SparkBuild extends Build {
"Apache repo" at "https://repository.apache.org/content/repositories/releases"
),
libraryDependencies ++= Seq(
+ "org.eclipse.paho" % "mqtt-client" % "0.4.0",
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty),
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 339fcd2a39..8022c4fe18 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -136,6 +136,11 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>0.4.0</version>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ee265ab4e9..09c2f7fd8e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -462,6 +462,21 @@ class StreamingContext private (
inputStream
}
+/**
+ * Create an input stream that receives messages pushed by a mqtt publisher.
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
new file mode 100644
index 0000000000..ac0528213d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
+
+import java.util.Properties
+import java.util.concurrent.Executors
+import java.io.IOException
+
+import org.eclipse.paho.client.mqttv3.MqttCallback
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+
+private[streaming]
+class MQTTInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) with Logging {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel)
+ .asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming]
+class MQTTReceiver(brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ // Set up persistence for messages
+ var peristance: MqttClientPersistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
+ // Callback automatically triggers as and when new message arrives on specified topic
+ var callback: MqttCallback = new MqttCallback() {
+
+ // Handles Mqtt message
+ override def messageArrived(arg0: String, arg1: MqttMessage) {
+ blockGenerator += new String(arg1.getPayload())
+ }
+
+ override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(arg0: Throwable) {
+ logInfo("Connection lost " + arg0)
+ }
+ }
+
+ // Set up callback for MqttClient
+ client.setCallback(callback)
+ }
+}