diff options
Diffstat (limited to 'external/kafka')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala (renamed from external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala) | 12 | ||||
-rw-r--r-- | external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java | 15 |
2 files changed, 13 insertions, 14 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala index ab0e8a6c8d..491331bb37 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.kafka +package org.apache.spark.streaming.api.java.kafka import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,13 +27,13 @@ import kafka.serializer.Decoder import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.kafka._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating Kafka input streams. */ -class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class KafkaFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. @@ -49,7 +49,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) ): JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) } /** @@ -69,7 +69,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) ): JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } /** @@ -101,7 +101,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext) implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - ssc.kafkaStream[K, V, U, T]( + javaStreamingContext.ssc.kafkaStream[K, V, U, T]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 66236df662..fdea96e506 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kafka; import java.util.HashMap; + +import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import org.junit.Test; import com.google.common.collect.Maps; import kafka.serializer.StringDecoder; @@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); // tests the API, does not actually test data receiving - JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2()); HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("zookeeper.connect", "localhost:12345"); kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream( + JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream( String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999); } } |