diff options
author | jerryshao <saisai.shao@intel.com> | 2013-10-12 15:02:57 +0800 |
---|---|---|
committer | jerryshao <saisai.shao@intel.com> | 2013-10-12 20:00:42 +0800 |
commit | c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd (patch) | |
tree | 14a083addc11bef9eeac371c8f383dc0e1c439d5 /streaming/src/test/java | |
parent | dca80094d317363e1e0d7e32bc7dfd99faf943cf (diff) | |
download | spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.gz spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.bz2 spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.zip |
Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..dc01f1e8aa 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zk.connect","localhost:12345"); - kafkaParams.put("groupid","consumer-group"); - JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = ssc.kafkaStream( + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, StorageLevel.MEMORY_AND_DISK()); } |