diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-06-24 21:44:50 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-06-24 21:44:50 -0700 |
commit | cfcda95f86c6eec4a39e5ad182e068722be66fe7 (patch) | |
tree | 55d3f7f1f1b299c44150694a1f044b93dfa28211 /streaming/src/test | |
parent | 575aff6b718f9447abd6dde40fd72c66b40774a8 (diff) | |
parent | f25282def5826fab6caabff28c82c57a7f3fdcb8 (diff) | |
download | spark-cfcda95f86c6eec4a39e5ad182e068722be66fe7.tar.gz spark-cfcda95f86c6eec4a39e5ad182e068722be66fe7.tar.bz2 spark-cfcda95f86c6eec4a39e5ad182e068722be66fe7.zip |
Merge pull request #571 from Reinvigorate/sm-kafka-serializers
Surfacing decoders on KafkaInputDStream
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 12 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 11 |
2 files changed, 19 insertions, 4 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 3bed500f73..e5fdbe1b7a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -4,6 +4,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; @@ -23,7 +24,6 @@ import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.dstream.KafkaPartitionKey; import spark.streaming.InputStreamsSuite; import java.io.*; @@ -1203,10 +1203,14 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zk.connect","localhost:12345"); + kafkaParams.put("groupid","consumer-group"); + JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 1024d3ac97..595c766a21 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -240,6 +240,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i) === expectedOutput(i)) } } + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) + + // Test specifying decoder + val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group") + val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + } } |