diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 01:47:53 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 01:47:53 -0800 |
commit | d0fd3b9ad238294346eb3465c489eabd41fb2380 (patch) | |
tree | d7f7f17fc842589f80c3ef16669741e3499d9692 /external/flume/src | |
parent | 977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff) | |
download | spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2 spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip |
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/flume/src')
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala (renamed from external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala) | 10 | ||||
-rw-r--r-- | external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java (renamed from external/flume/src/test/java/JavaFlumeStreamSuite.java) | 13 |
2 files changed, 10 insertions, 13 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala index 4e66ae3535..3347d19796 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala @@ -15,24 +15,24 @@ * limitations under the License. */ -package org.apache.spark.streaming.flume +package org.apache.spark.streaming.api.java.flume import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.flume._ import org.apache.spark.storage.StorageLevel /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating Flume input streams. */ -class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class FlumeFunctions(javaStreamingContext: JavaStreamingContext) { /** * Creates a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) + javaStreamingContext.ssc.flumeStream(hostname, port) } /** @@ -43,6 +43,6 @@ class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) */ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) + javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel) } } diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index deffc78c4c..5930fee925 100644 --- a/external/flume/src/test/java/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -1,4 +1,4 @@ -/* +package org.apache.spark.streaming.flume;/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -18,21 +18,18 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; import org.apache.spark.streaming.flume.SparkFlumeEvent; import org.junit.Test; public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { @Test public void testFlumeStream() { - JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345); - JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345, + JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345); + JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999); } } |