diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-07-03 11:43:26 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-07-03 11:43:26 +0530 |
commit | a5f1f6a907b116325c56d38157ec2df76150951e (patch) | |
tree | 27de949c24a61b2301c7690db9e28992f49ea39c /streaming/src/test/java | |
parent | b7794813b181f13801596e8d8c3b4471c0c84f20 (diff) | |
parent | 6d60fe571a405eb9306a2be1817901316a46f892 (diff) | |
download | spark-a5f1f6a907b116325c56d38157ec2df76150951e.tar.gz spark-a5f1f6a907b116325c56d38157ec2df76150951e.tar.bz2 spark-a5f1f6a907b116325c56d38157ec2df76150951e.zip |
Merge branch 'master' into master-merge
Conflicts:
core/pom.xml
core/src/main/scala/spark/MapOutputTracker.scala
core/src/main/scala/spark/RDD.scala
core/src/main/scala/spark/RDDCheckpointData.scala
core/src/main/scala/spark/SparkContext.scala
core/src/main/scala/spark/Utils.scala
core/src/main/scala/spark/api/python/PythonRDD.scala
core/src/main/scala/spark/deploy/client/Client.scala
core/src/main/scala/spark/deploy/master/MasterWebUI.scala
core/src/main/scala/spark/deploy/worker/Worker.scala
core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
core/src/main/scala/spark/rdd/BlockRDD.scala
core/src/main/scala/spark/rdd/ZippedRDD.scala
core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
core/src/main/scala/spark/storage/BlockManager.scala
core/src/main/scala/spark/storage/BlockManagerMaster.scala
core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
core/src/main/scala/spark/storage/BlockManagerUI.scala
core/src/main/scala/spark/util/AkkaUtils.scala
core/src/test/scala/spark/SizeEstimatorSuite.scala
pom.xml
project/SparkBuild.scala
repl/src/main/scala/spark/repl/SparkILoop.scala
repl/src/test/scala/spark/repl/ReplSuite.scala
streaming/src/main/scala/spark/streaming/StreamingContext.scala
streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 3bed500f73..4cf10582a9 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -4,6 +4,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; @@ -23,7 +24,6 @@ import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.dstream.KafkaPartitionKey; import spark.streaming.InputStreamsSuite; import java.io.*; @@ -1203,10 +1203,14 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zk.connect","localhost:12345"); + kafkaParams.put("groupid","consumer-group"); + JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()); } @@ -1263,7 +1267,7 @@ public class JavaAPISuite implements Serializable { @Test public void testTwitterStream() { String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY()); + JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); } @Test |