aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
commitd0fd3b9ad238294346eb3465c489eabd41fb2380 (patch)
treed7f7f17fc842589f80c3ef16669741e3499d9692 /external/mqtt
parent977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff)
downloadspark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/mqtt')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala)10
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala)0
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala)0
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala)0
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java10
5 files changed, 9 insertions, 11 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
index d814da0f0d..72124956fc 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.mqtt
+package org.apache.spark.streaming.api.java.mqtt
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.mqtt._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating MQTT input streams.
*/
-class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
@@ -39,7 +39,7 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
topic: String
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.mqttStream(brokerUrl, topic)
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
}
/**
@@ -54,6 +54,6 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.mqttStream(brokerUrl, topic, storageLevel)
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
index 86f4e9c724..86f4e9c724 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index c8987a3ee0..c8987a3ee0 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
index 28a944f57e..28a944f57e 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index c1f41640dc..3ddb4d084f 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
- JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc);
+ MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic);
- JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic,
+ JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
+ JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999);
}
}