path: root/external/mqtt
diff options
Diffstat (limited to 'external/mqtt')
11 files changed, 0 insertions, 685 deletions
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
deleted file mode 100644
index d0d968782c..0000000000
--- a/external/mqtt/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-mqtt_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-mqtt</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External MQTT</name>
- <url>http://spark.apache.org/</url>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.0.2</version>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- <version>5.7.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <!-- Assemble a jar with test dependencies for Python tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>test-jar-with-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <!-- Make sure the file path is same as the sbt build -->
- <finalName>spark-streaming-mqtt-test-${project.version}</finalName>
- <outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
- <appendAssemblyId>false</appendAssemblyId>
- <!-- Don't publish it since it's only for Python tests -->
- <attach>false</attach>
- <descriptors>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml
deleted file mode 100644
index c110b01b34..0000000000
--- a/external/mqtt/src/main/assembly/assembly.xml
+++ /dev/null
@@ -1,44 +0,0 @@
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
- <id>test-jar-with-dependencies</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
- <outputDirectory></outputDirectory>
- </fileSet>
- </fileSets>
- <dependencySets>
- <dependencySet>
- <useTransitiveDependencies>true</useTransitiveDependencies>
- <scope>test</scope>
- <unpack>true</unpack>
- <excludes>
- <exclude>org.apache.hadoop:*:jar</exclude>
- <exclude>org.apache.zookeeper:*:jar</exclude>
- <exclude>org.apache.avro:*:jar</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
deleted file mode 100644
index cbad6f7fe4..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ /dev/null
@@ -1,102 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.mqtt
-import java.nio.charset.StandardCharsets
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-class MQTTInputDStream(
- _ssc: StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](_ssc) {
- private[streaming] override def name: String = s"MQTT stream [$id]"
- def getReceiver(): Receiver[String] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel)
- }
-class MQTTReceiver(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends Receiver[String](storageLevel) {
- def onStop() {
- }
- def onStart() {
- // Set up persistence for messages
- val persistence = new MemoryPersistence()
- // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
- // Callback automatically triggers as and when new message arrives on specified topic
- val callback = new MqttCallback() {
- // Handles Mqtt message
- override def messageArrived(topic: String, message: MqttMessage) {
- store(new String(message.getPayload(), StandardCharsets.UTF_8))
- }
- override def deliveryComplete(token: IMqttDeliveryToken) {
- }
- override def connectionLost(cause: Throwable) {
- restart("Connection lost ", cause)
- }
- }
- // Set up callback for MqttClient. This needs to happen before
- // connecting or subscribing, otherwise messages may be lost
- client.setCallback(callback)
- // Connect to MqttBroker
- client.connect()
- // Subscribe to Mqtt topic
- client.subscribe(topic)
- }
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
deleted file mode 100644
index 7b8d56d6fa..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ /dev/null
@@ -1,92 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.mqtt
-import scala.reflect.ClassTag
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-object MQTTUtils {
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param ssc StreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic Topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- */
- def createStream(
- ssc: StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[String] = {
- new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
- }
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic Topic name to subscribe to
- */
- def createStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topic: String
- ): JavaReceiverInputDStream[String] = {
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, brokerUrl, topic)
- }
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic Topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
- def createStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[String] = {
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, brokerUrl, topic, storageLevel)
- }
- * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's MQTTUtils.
- */
-private[mqtt] class MQTTUtilsPythonHelper {
- def createStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ): JavaDStream[String] = {
- MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
- }
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
deleted file mode 100644
index 728e0d8663..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- * MQTT receiver for Spark Streaming.
- */
-package org.apache.spark.streaming.mqtt; \ No newline at end of file
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 63d0d13818..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming
- * MQTT receiver for Spark Streaming.
- */
-package object mqtt
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming;
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-public abstract class LocalJavaStreamingContext {
- protected transient JavaStreamingContext ssc;
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
deleted file mode 100644
index ce5aa1e0cd..0000000000
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ /dev/null
@@ -1,37 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.mqtt;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testMQTTStream() {
- String brokerUrl = "abc";
- String topic = "def";
- // tests the API, does not actually test data receiving
- JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
- JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
- StorageLevel.MEMORY_AND_DISK_SER_2());
- }
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/mqtt/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-# Ignore messages below warning level from Jetty, because it's a bit verbose
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
deleted file mode 100644
index fdcd18c6fb..0000000000
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ /dev/null
@@ -1,79 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.mqtt
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {
- private val batchDuration = Milliseconds(500)
- private val master = "local[2]"
- private val framework = this.getClass.getSimpleName
- private val topic = "def"
- private var ssc: StreamingContext = _
- private var mqttTestUtils: MQTTTestUtils = _
- before {
- ssc = new StreamingContext(master, framework, batchDuration)
- mqttTestUtils = new MQTTTestUtils
- mqttTestUtils.setup()
- }
- after {
- if (ssc != null) {
- ssc.stop()
- ssc = null
- }
- if (mqttTestUtils != null) {
- mqttTestUtils.teardown()
- mqttTestUtils = null
- }
- }
- test("mqtt input stream") {
- val sendMessage = "MQTT demo for spark streaming"
- val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + mqttTestUtils.brokerUri, topic,
- StorageLevel.MEMORY_ONLY)
- @volatile var receiveMessage: List[String] = List()
- receiveStream.foreachRDD { rdd =>
- if (rdd.collect.length > 0) {
- receiveMessage = receiveMessage ::: List(rdd.first)
- receiveMessage
- }
- }
- ssc.start()
- // Retry it because we don't know when the receiver will start.
- eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- mqttTestUtils.publishData(topic, sendMessage)
- assert(sendMessage.equals(receiveMessage(0)))
- }
- ssc.stop()
- }
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
deleted file mode 100644
index 3680c13605..0000000000
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
+++ /dev/null
@@ -1,111 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.mqtt
-import java.net.{ServerSocket, URI}
-import java.nio.charset.StandardCharsets
-import scala.language.postfixOps
-import org.apache.activemq.broker.{BrokerService, TransportConnector}
-import org.apache.commons.lang3.RandomUtils
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
- * Share codes for Scala and Python unit tests
- */
-private[mqtt] class MQTTTestUtils extends Logging {
- private val persistenceDir = Utils.createTempDir()
- private val brokerHost = "localhost"
- private val brokerPort = findFreePort()
- private var broker: BrokerService = _
- private var connector: TransportConnector = _
- def brokerUri: String = {
- s"$brokerHost:$brokerPort"
- }
- def setup(): Unit = {
- broker = new BrokerService()
- broker.setDataDirectoryFile(Utils.createTempDir())
- connector = new TransportConnector()
- connector.setName("mqtt")
- connector.setUri(new URI("mqtt://" + brokerUri))
- broker.addConnector(connector)
- broker.start()
- }
- def teardown(): Unit = {
- if (broker != null) {
- broker.stop()
- broker = null
- }
- if (connector != null) {
- connector.stop()
- connector = null
- }
- Utils.deleteRecursively(persistenceDir)
- }
- private def findFreePort(): Int = {
- val candidatePort = RandomUtils.nextInt(1024, 65536)
- Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
- val socket = new ServerSocket(trialPort)
- socket.close()
- (null, trialPort)
- }, new SparkConf())._2
- }
- def publishData(topic: String, data: String): Unit = {
- var client: MqttClient = null
- try {
- val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
- client = new MqttClient("tcp://" + brokerUri, MqttClient.generateClientId(), persistence)
- client.connect()
- if (client.isConnected) {
- val msgTopic = client.getTopic(topic)
- val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8))
- message.setQos(1)
- message.setRetained(true)
- for (i <- 0 to 10) {
- try {
- msgTopic.publish(message)
- } catch {
- case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
- // wait for Spark streaming to consume something from the message queue
- Thread.sleep(50)
- }
- }
- }
- } finally {
- if (client != null) {
- client.disconnect()
- client.close()
- client = null
- }
- }
- }