aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-10-12 15:02:57 +0800
committerjerryshao <saisai.shao@intel.com>2013-10-12 20:00:42 +0800
commitc23cd72b4bbcbf5f615636095c69e9a2e39bfbdd (patch)
tree14a083addc11bef9eeac371c8f383dc0e1c439d5 /streaming/src/test/java
parentdca80094d317363e1e0d7e32bc7dfd99faf943cf (diff)
downloadspark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.gz
spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.bz2
spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.zip
Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java16
1 files changed, 11 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..dc01f1e8aa 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zk.connect","localhost:12345");
- kafkaParams.put("groupid","consumer-group");
- JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+ kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("group.id","consumer-group");
+ JavaPairDStream<String, String> test3 = ssc.kafkaStream(
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topics,
StorageLevel.MEMORY_AND_DISK());
}