aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-07-03 11:43:26 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-07-03 11:43:26 +0530
commita5f1f6a907b116325c56d38157ec2df76150951e (patch)
tree27de949c24a61b2301c7690db9e28992f49ea39c /streaming/src/test/java
parentb7794813b181f13801596e8d8c3b4471c0c84f20 (diff)
parent6d60fe571a405eb9306a2be1817901316a46f892 (diff)
downloadspark-a5f1f6a907b116325c56d38157ec2df76150951e.tar.gz
spark-a5f1f6a907b116325c56d38157ec2df76150951e.tar.bz2
spark-a5f1f6a907b116325c56d38157ec2df76150951e.zip
Merge branch 'master' into master-merge
Conflicts: core/pom.xml core/src/main/scala/spark/MapOutputTracker.scala core/src/main/scala/spark/RDD.scala core/src/main/scala/spark/RDDCheckpointData.scala core/src/main/scala/spark/SparkContext.scala core/src/main/scala/spark/Utils.scala core/src/main/scala/spark/api/python/PythonRDD.scala core/src/main/scala/spark/deploy/client/Client.scala core/src/main/scala/spark/deploy/master/MasterWebUI.scala core/src/main/scala/spark/deploy/worker/Worker.scala core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala core/src/main/scala/spark/rdd/BlockRDD.scala core/src/main/scala/spark/rdd/ZippedRDD.scala core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala core/src/main/scala/spark/storage/BlockManager.scala core/src/main/scala/spark/storage/BlockManagerMaster.scala core/src/main/scala/spark/storage/BlockManagerMasterActor.scala core/src/main/scala/spark/storage/BlockManagerUI.scala core/src/main/scala/spark/util/AkkaUtils.scala core/src/test/scala/spark/SizeEstimatorSuite.scala pom.xml project/SparkBuild.scala repl/src/main/scala/spark/repl/SparkILoop.scala repl/src/test/scala/spark/repl/ReplSuite.scala streaming/src/main/scala/spark/streaming/StreamingContext.scala streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java14
1 files changed, 9 insertions, 5 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 3bed500f73..4cf10582a9 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -4,6 +4,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
@@ -23,7 +24,6 @@ import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
-import spark.streaming.dstream.KafkaPartitionKey;
import spark.streaming.InputStreamsSuite;
import java.io.*;
@@ -1203,10 +1203,14 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ StorageLevel.MEMORY_AND_DISK());
+
+ HashMap<String, String> kafkaParams = Maps.newHashMap();
+ kafkaParams.put("zk.connect","localhost:12345");
+ kafkaParams.put("groupid","consumer-group");
+ JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
StorageLevel.MEMORY_AND_DISK());
}
@@ -1263,7 +1267,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTwitterStream() {
String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY());
+ JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
}
@Test