diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 01:47:53 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 01:47:53 -0800 |
commit | d0fd3b9ad238294346eb3465c489eabd41fb2380 (patch) | |
tree | d7f7f17fc842589f80c3ef16669741e3499d9692 /external/mqtt/src | |
parent | 977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff) | |
download | spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2 spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip |
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/mqtt/src')
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala) | 10 | ||||
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala) | 0 | ||||
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala) | 0 | ||||
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala) | 0 | ||||
-rw-r--r-- | external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java | 10 |
5 files changed, 9 insertions, 11 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala index d814da0f0d..72124956fc 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala @@ -15,19 +15,19 @@ * limitations under the License. */ -package org.apache.spark.streaming.mqtt +package org.apache.spark.streaming.api.java.mqtt import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.mqtt._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating MQTT input streams. */ -class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class MQTTFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create an input stream that receives messages pushed by a MQTT publisher. @@ -39,7 +39,7 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) topic: String ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.mqttStream(brokerUrl, topic) + javaStreamingContext.ssc.mqttStream(brokerUrl, topic) } /** @@ -54,6 +54,6 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.mqttStream(brokerUrl, topic, storageLevel) + javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala index 86f4e9c724..86f4e9c724 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index c8987a3ee0..c8987a3ee0 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala index 28a944f57e..28a944f57e 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index c1f41640dc..3ddb4d084f 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { public void testMQTTStream() { String brokerUrl = "abc"; String topic = "def"; - JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc); + MQTTFunctions mqttFunc = new MQTTFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic); - JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic, + JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic); + JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999); } } |