aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-10-22 23:40:40 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-10-22 23:40:40 -0700
commitfe8626efd1e9f78b8f7db4622264da9279ee2642 (patch)
tree2ab3be8e61f79a8935069e236442e9cea8b2dd43 /streaming/src/test/java
parent72d2e1dd777696640f64aaf92fecab64c6387df0 (diff)
parent452aa36d671d3fdf53058e80cbd86787c8870cd7 (diff)
downloadspark-fe8626efd1e9f78b8f7db4622264da9279ee2642.tar.gz
spark-fe8626efd1e9f78b8f7db4622264da9279ee2642.tar.bz2
spark-fe8626efd1e9f78b8f7db4622264da9279ee2642.zip
Merge branch 'apache-master' into transform
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java16
1 files changed, 11 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 16622a3459..2f92421367 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1492,14 +1492,20 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zk.connect","localhost:12345");
- kafkaParams.put("groupid","consumer-group");
- JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+ kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("group.id","consumer-group");
+ JavaPairDStream<String, String> test3 = ssc.kafkaStream(
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topics,
StorageLevel.MEMORY_AND_DISK());
}