diff options
Diffstat (limited to 'external/mqtt/src')
5 files changed, 287 insertions, 0 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala new file mode 100644 index 0000000000..c8987a3ee0 --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException + +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] +class MQTTInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) + + // Connect to MqttBroker + client.connect() + + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + logInfo("Connection lost " + arg0) + } + } + + // Set up callback for MqttClient + client.setCallback(callback) + } +} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala new file mode 100644 index 0000000000..0e6c25dbee --- /dev/null +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import scala.reflect.ClassTag + +object MQTTUtils { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def createStream( + ssc: StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + */ + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, brokerUrl, topic) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. + */ + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ): JavaDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, brokerUrl, topic, storageLevel) + } +} diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java new file mode 100644 index 0000000000..44743aaecf --- /dev/null +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.junit.Test; + +import org.apache.spark.streaming.LocalJavaStreamingContext; + +public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { + @Test + public void testMQTTStream() { + String brokerUrl = "abc"; + String topic = "def"; + + // tests the API, does not actually test data receiving + JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); + JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, + StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/mqtt/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala new file mode 100644 index 0000000000..fcc159e85a --- /dev/null +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel + +class MQTTStreamSuite extends TestSuiteBase { + + test("MQTT input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val brokerUrl = "abc" + val topic = "def" + + // tests the API, does not actually test data receiving + val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic) + val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + + // TODO: Actually test receiving data + } +} |