aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt
diff options
context:
space:
mode:
Diffstat (limited to 'external/mqtt')
-rw-r--r--external/mqtt/pom.xml108
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala59
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala43
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala110
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala24
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java39
-rw-r--r--external/mqtt/src/test/resources/log4j.properties29
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala36
8 files changed, 448 insertions, 0 deletions
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
new file mode 100644
index 0000000000..31b4fa87de
--- /dev/null
+++ b/external/mqtt/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-mqtt_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External MQTT</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <repositories>
+ <repository>
+ <id>mqtt-repo</id>
+ <name>MQTT Repository</name>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>0.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000000..72124956fc
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.mqtt
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.mqtt._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating MQTT input streams.
+ */
+class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
+ }
+}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000000..86f4e9c724
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
+ */
+class MQTTFunctions(ssc: StreamingContext) {
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
new file mode 100644
index 0000000000..c8987a3ee0
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import java.util.Properties
+import java.util.concurrent.Executors
+import java.io.IOException
+
+import org.eclipse.paho.client.mqttv3.MqttCallback
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+
+private[streaming]
+class MQTTInputDStream[T: ClassTag](
+ @transient ssc_ : StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) with Logging {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming]
+class MQTTReceiver(brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ // Set up persistence for messages
+ var peristance: MqttClientPersistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
+ // Callback automatically triggers as and when new message arrives on specified topic
+ var callback: MqttCallback = new MqttCallback() {
+
+ // Handles Mqtt message
+ override def messageArrived(arg0: String, arg1: MqttMessage) {
+ blockGenerator += new String(arg1.getPayload())
+ }
+
+ override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(arg0: Throwable) {
+ logInfo("Connection lost " + arg0)
+ }
+ }
+
+ // Set up callback for MqttClient
+ client.setCallback(callback)
+ }
+}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
new file mode 100644
index 0000000000..28a944f57e
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object mqtt {
+ implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
+}
+
+
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
new file mode 100644
index 0000000000..3ddb4d084f
--- /dev/null
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
+import org.junit.Test;
+
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testMQTTStream() {
+ String brokerUrl = "abc";
+ String topic = "def";
+ MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
+ JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/mqtt/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
new file mode 100644
index 0000000000..ab6542918b
--- /dev/null
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+
+class MQTTStreamSuite extends TestSuiteBase {
+
+ test("MQTT input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val brokerUrl = "abc"
+ val topic = "def"
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.mqttStream(brokerUrl, topic)
+ val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // TODO: Actually test receiving data
+ }
+}