diff options
15 files changed, 67 insertions, 76 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index e53c4f9e83..64832a9721 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; import org.apache.spark.streaming.flume.SparkFlumeEvent; /** @@ -52,8 +52,8 @@ public class JavaFlumeEventCount { JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); - JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); - JavaDStream<SparkFlumeEvent> flumeStream = sscWithFlume.flumeStream("localhost", port); + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); + JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index de0420ca83..207ce8cd4f 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka; +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import scala.Tuple2; /** @@ -64,8 +64,8 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); - JavaPairDStream<String, String> messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); + JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala index 4e66ae3535..3347d19796 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala @@ -15,24 +15,24 @@ * limitations under the License. */ -package org.apache.spark.streaming.flume +package org.apache.spark.streaming.api.java.flume import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.flume._ import org.apache.spark.storage.StorageLevel /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating Flume input streams. */ -class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class FlumeFunctions(javaStreamingContext: JavaStreamingContext) { /** * Creates a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) + javaStreamingContext.ssc.flumeStream(hostname, port) } /** @@ -43,6 +43,6 @@ class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext) */ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) + javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel) } } diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index deffc78c4c..5930fee925 100644 --- a/external/flume/src/test/java/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -1,4 +1,4 @@ -/* +package org.apache.spark.streaming.flume;/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -18,21 +18,18 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.api.java.flume.FlumeFunctions; import org.apache.spark.streaming.flume.SparkFlumeEvent; import org.junit.Test; public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { @Test public void testFlumeStream() { - JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + FlumeFunctions flumeFunc = new FlumeFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345); - JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345, + JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345); + JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999); } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala index ab0e8a6c8d..491331bb37 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.kafka +package org.apache.spark.streaming.api.java.kafka import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,13 +27,13 @@ import kafka.serializer.Decoder import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.kafka._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating Kafka input streams. */ -class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class KafkaFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. @@ -49,7 +49,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) ): JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) } /** @@ -69,7 +69,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) ): JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } /** @@ -101,7 +101,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - ssc.kafkaStream[K, V, U, T]( + javaStreamingContext.ssc.kafkaStream[K, V, U, T]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 66236df662..fdea96e506 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kafka; import java.util.HashMap; + +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import org.junit.Test; import com.google.common.collect.Maps; import kafka.serializer.StringDecoder; @@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); // tests the API, does not actually test data receiving - JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2()); HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("zookeeper.connect", "localhost:12345"); kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream( + JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream( String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999); } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala index d814da0f0d..72124956fc 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala @@ -15,19 +15,19 @@ * limitations under the License. */ -package org.apache.spark.streaming.mqtt +package org.apache.spark.streaming.api.java.mqtt import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.mqtt._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating MQTT input streams. */ -class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class MQTTFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create an input stream that receives messages pushed by a MQTT publisher. @@ -39,7 +39,7 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) topic: String ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.mqttStream(brokerUrl, topic) + javaStreamingContext.ssc.mqttStream(brokerUrl, topic) } /** @@ -54,6 +54,6 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext) storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.mqttStream(brokerUrl, topic, storageLevel) + javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala index 86f4e9c724..86f4e9c724 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index c8987a3ee0..c8987a3ee0 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala index 28a944f57e..28a944f57e 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index c1f41640dc..3ddb4d084f 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { public void testMQTTStream() { String brokerUrl = "abc"; String topic = "def"; - JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc); + MQTTFunctions mqttFunc = new MQTTFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic); - JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic, + JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic); + JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999); } } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala index 0250364331..22e297a03a 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala @@ -15,20 +15,20 @@ * limitations under the License. */ -package org.apache.spark.streaming.twitter +package org.apache.spark.streaming.api.java.twitter import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.twitter._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating Twitter input streams. */ -class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create a input stream that returns tweets received from Twitter using Twitter4J's default @@ -37,7 +37,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * twitter4j.oauth.accessTokenSecret. */ def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream(None) + javaStreamingContext.ssc.twitterStream(None) } /** @@ -48,7 +48,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream(filters: Array[String]): JavaDStream[Status] = { - ssc.twitterStream(None, filters) + javaStreamingContext.ssc.twitterStream(None, filters) } /** @@ -60,7 +60,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * @param storageLevel Storage level to use for storing the received objects */ def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) + javaStreamingContext.ssc.twitterStream(None, filters, storageLevel) } /** @@ -68,7 +68,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * @param twitterAuth Twitter4J Authorization */ def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) + javaStreamingContext.ssc.twitterStream(Some(twitterAuth)) } /** @@ -80,7 +80,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters) } /** @@ -94,6 +94,6 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel) } } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index 34e4fbdd85..4564d6cd33 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.twitter; import java.util.Arrays; + +import org.apache.spark.streaming.api.java.twitter.TwitterFunctions; import org.junit.Test; import twitter4j.Status; @@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream; public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { @Test public void testTwitterStream() { - JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc); + TwitterFunctions twitterFunc = new TwitterFunctions(ssc); String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); Authorization auth = NullAuthorization.getInstance(); // tests the API, does not actually test data receiving - JavaDStream<Status> test1 = sscWithTwitter.twitterStream(); - JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters); + JavaDStream<Status> test1 = twitterFunc.twitterStream(); + JavaDStream<Status> test2 = twitterFunc.twitterStream(filters); JavaDStream<Status> test3 = - sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth); - JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters); + twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = twitterFunc.twitterStream(auth); + JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters); JavaDStream<Status> test6 = - sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999); + twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala index dc5d1f05be..a9bbce71f5 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.zeromq +package org.apache.spark.streaming.api.java.zeromq import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,13 +27,13 @@ import akka.zeromq.Subscribe import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.zeromq._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating ZeroMQ input streams. */ -class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create an input stream that receives messages pushed by a zeromq publisher. @@ -55,7 +55,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) } /** @@ -77,7 +77,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) } /** @@ -97,6 +97,6 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext) implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 96af7d737d..b020ae4cef 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming.zeromq; +import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions; import org.junit.Test; import akka.actor.SupervisorStrategy; @@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @Test // tests the API, does not actually test data receiving public void testZeroMQStream() { - JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc); + ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc); String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { @@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { } }; - JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream( + JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream( publishUrl, subscribe, bytesToObjects); - JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream( + JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream( publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream( + JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream( publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999); } } |