diff options
Diffstat (limited to 'external/mqtt/src')
10 files changed, 0 insertions, 581 deletions
diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml deleted file mode 100644 index c110b01b34..0000000000 --- a/external/mqtt/src/main/assembly/assembly.xml +++ /dev/null @@ -1,44 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<assembly> - <id>test-jar-with-dependencies</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - - <fileSets> - <fileSet> - <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory> - <outputDirectory></outputDirectory> - </fileSet> - </fileSets> - - <dependencySets> - <dependencySet> - <useTransitiveDependencies>true</useTransitiveDependencies> - <scope>test</scope> - <unpack>true</unpack> - <excludes> - <exclude>org.apache.hadoop:*:jar</exclude> - <exclude>org.apache.zookeeper:*:jar</exclude> - <exclude>org.apache.avro:*:jar</exclude> - </excludes> - </dependencySet> - </dependencySets> - -</assembly> diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala deleted file mode 100644 index cbad6f7fe4..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import java.nio.charset.StandardCharsets - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receiver.Receiver - -/** - * Input stream that subscribe messages from a Mqtt Broker. - * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. - */ - -private[streaming] -class MQTTInputDStream( - _ssc: StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends ReceiverInputDStream[String](_ssc) { - - private[streaming] override def name: String = s"MQTT stream [$id]" - - def getReceiver(): Receiver[String] = { - new MQTTReceiver(brokerUrl, topic, storageLevel) - } -} - -private[streaming] -class MQTTReceiver( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends Receiver[String](storageLevel) { - - def onStop() { - - } - - def onStart() { - - // Set up persistence for messages - val persistence = new MemoryPersistence() - - // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) - - // Callback automatically triggers as and when new message arrives on specified topic - val callback = new MqttCallback() { - - // Handles Mqtt message - override def messageArrived(topic: String, message: MqttMessage) { - store(new String(message.getPayload(), StandardCharsets.UTF_8)) - } - - override def deliveryComplete(token: IMqttDeliveryToken) { - } - - override def connectionLost(cause: Throwable) { - restart("Connection lost ", cause) - } - } - - // Set up callback for MqttClient. This needs to happen before - // connecting or subscribing, otherwise messages may be lost - client.setCallback(callback) - - // Connect to MqttBroker - client.connect() - - // Subscribe to Mqtt topic - client.subscribe(topic) - - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala deleted file mode 100644 index 7b8d56d6fa..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import scala.reflect.ClassTag - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.ReceiverInputDStream - -object MQTTUtils { - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param ssc StreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topic Topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. - */ - def createStream( - ssc: StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[String] = { - new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) - } - - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topic Topic name to subscribe to - */ - def createStream( - jssc: JavaStreamingContext, - brokerUrl: String, - topic: String - ): JavaReceiverInputDStream[String] = { - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - createStream(jssc.ssc, brokerUrl, topic) - } - - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topic Topic name to subscribe to - * @param storageLevel RDD storage level. - */ - def createStream( - jssc: JavaStreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[String] = { - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - createStream(jssc.ssc, brokerUrl, topic, storageLevel) - } -} - -/** - * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and - * function so that it can be easily instantiated and called from Python's MQTTUtils. - */ -private[mqtt] class MQTTUtilsPythonHelper { - - def createStream( - jssc: JavaStreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ): JavaDStream[String] = { - MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel) - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java deleted file mode 100644 index 728e0d8663..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * MQTT receiver for Spark Streaming. - */ -package org.apache.spark.streaming.mqtt;
\ No newline at end of file diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala deleted file mode 100644 index 63d0d13818..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -/** - * MQTT receiver for Spark Streaming. - */ -package object mqtt diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index cfedb5a042..0000000000 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java deleted file mode 100644 index ce5aa1e0cd..0000000000 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt; - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; - -import org.apache.spark.streaming.LocalJavaStreamingContext; - -public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { - @Test - public void testMQTTStream() { - String brokerUrl = "abc"; - String topic = "def"; - - // tests the API, does not actually test data receiving - JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); - JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, - StorageLevel.MEMORY_AND_DISK_SER_2()); - } -} diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties deleted file mode 100644 index 75e3b53a09..0000000000 --- a/external/mqtt/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN - diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala deleted file mode 100644 index fdcd18c6fb..0000000000 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually - -import org.apache.spark.SparkFunSuite -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} - -class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { - - private val batchDuration = Milliseconds(500) - private val master = "local[2]" - private val framework = this.getClass.getSimpleName - private val topic = "def" - - private var ssc: StreamingContext = _ - private var mqttTestUtils: MQTTTestUtils = _ - - before { - ssc = new StreamingContext(master, framework, batchDuration) - mqttTestUtils = new MQTTTestUtils - mqttTestUtils.setup() - } - - after { - if (ssc != null) { - ssc.stop() - ssc = null - } - if (mqttTestUtils != null) { - mqttTestUtils.teardown() - mqttTestUtils = null - } - } - - test("mqtt input stream") { - val sendMessage = "MQTT demo for spark streaming" - val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + mqttTestUtils.brokerUri, topic, - StorageLevel.MEMORY_ONLY) - - @volatile var receiveMessage: List[String] = List() - receiveStream.foreachRDD { rdd => - if (rdd.collect.length > 0) { - receiveMessage = receiveMessage ::: List(rdd.first) - receiveMessage - } - } - - ssc.start() - - // Retry it because we don't know when the receiver will start. - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - mqttTestUtils.publishData(topic, sendMessage) - assert(sendMessage.equals(receiveMessage(0))) - } - ssc.stop() - } -} diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala deleted file mode 100644 index 3680c13605..0000000000 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import java.net.{ServerSocket, URI} -import java.nio.charset.StandardCharsets - -import scala.language.postfixOps - -import org.apache.activemq.broker.{BrokerService, TransportConnector} -import org.apache.commons.lang3.RandomUtils -import org.eclipse.paho.client.mqttv3._ -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.util.Utils - -/** - * Share codes for Scala and Python unit tests - */ -private[mqtt] class MQTTTestUtils extends Logging { - - private val persistenceDir = Utils.createTempDir() - private val brokerHost = "localhost" - private val brokerPort = findFreePort() - - private var broker: BrokerService = _ - private var connector: TransportConnector = _ - - def brokerUri: String = { - s"$brokerHost:$brokerPort" - } - - def setup(): Unit = { - broker = new BrokerService() - broker.setDataDirectoryFile(Utils.createTempDir()) - connector = new TransportConnector() - connector.setName("mqtt") - connector.setUri(new URI("mqtt://" + brokerUri)) - broker.addConnector(connector) - broker.start() - } - - def teardown(): Unit = { - if (broker != null) { - broker.stop() - broker = null - } - if (connector != null) { - connector.stop() - connector = null - } - Utils.deleteRecursively(persistenceDir) - } - - private def findFreePort(): Int = { - val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { - val socket = new ServerSocket(trialPort) - socket.close() - (null, trialPort) - }, new SparkConf())._2 - } - - def publishData(topic: String, data: String): Unit = { - var client: MqttClient = null - try { - val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) - client = new MqttClient("tcp://" + brokerUri, MqttClient.generateClientId(), persistence) - client.connect() - if (client.isConnected) { - val msgTopic = client.getTopic(topic) - val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8)) - message.setQos(1) - message.setRetained(true) - - for (i <- 0 to 10) { - try { - msgTopic.publish(message) - } catch { - case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT => - // wait for Spark streaming to consume something from the message queue - Thread.sleep(50) - } - } - } - } finally { - if (client != null) { - client.disconnect() - client.close() - client = null - } - } - } - -} |