diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 01:47:53 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 01:47:53 -0800 |
commit | d0fd3b9ad238294346eb3465c489eabd41fb2380 (patch) | |
tree | d7f7f17fc842589f80c3ef16669741e3499d9692 /external/zeromq/src | |
parent | 977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff) | |
download | spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2 spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip |
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/zeromq/src')
-rw-r--r-- | external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala (renamed from external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala) | 12 | ||||
-rw-r--r-- | external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java | 12 |
2 files changed, 11 insertions, 13 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala index dc5d1f05be..a9bbce71f5 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.zeromq +package org.apache.spark.streaming.api.java.zeromq import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,13 +27,13 @@ import akka.zeromq.Subscribe import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.zeromq._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating ZeroMQ input streams. */ -class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create an input stream that receives messages pushed by a zeromq publisher. @@ -55,7 +55,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) } /** @@ -77,7 +77,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) } /** @@ -97,6 +97,6 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 96af7d737d..b020ae4cef 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming.zeromq; +import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions; import org.junit.Test; import akka.actor.SupervisorStrategy; @@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @Test // tests the API, does not actually test data receiving public void testZeroMQStream() { - JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc); + ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc); String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { @@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { } }; - JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream( + JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream( publishUrl, subscribe, bytesToObjects); - JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream( + JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream( publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream( + JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream( publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999); } } |