aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src
diff options
context:
space:
mode:
Diffstat (limited to 'external/mqtt/src')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala41
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala12
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala6
4 files changed, 33 insertions, 32 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1204cfba39..0beee8b415 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -39,6 +39,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
@@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._
*/
private[streaming]
-class MQTTInputDStream[T: ClassTag](
+class MQTTInputDStream(
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ ) extends ReceiverInputDStream[String](ssc_) with Logging {
+
+ def getReceiver(): Receiver[String] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+private[streaming]
+class MQTTReceiver(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends Receiver[String](storageLevel) {
def onStop() {
- blockGenerator.stop()
- }
+ }
+
def onStart() {
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
+ // Set up persistence for messages
+ val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Connect to MqttBroker
client.connect()
@@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String,
client.subscribe(topic)
// Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
+ val callback: MqttCallback = new MqttCallback() {
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
+ store(new String(arg1.getPayload()))
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
+ restart("Connection lost ", arg0)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 1b09ee5dc8..c5ffe51f99 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
/**
@@ -36,8 +36,8 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ): ReceiverInputDStream[String] = {
+ new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}
/**
@@ -51,7 +51,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
@@ -68,7 +68,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 44743aaecf..ce5aa1e0cd 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
String topic = "def";
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
- JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 89c40ad461..467fd263e2 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class MQTTStreamSuite extends TestSuiteBase {
@@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
- val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2: ReceiverInputDStream[String] =
+ MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
ssc.stop()