aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-05-10 12:05:10 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-05-10 12:05:10 -0600
commitd761e7359deb7ca864d33b8f2e4380b57448630b (patch)
tree00240df4d48cb1a1264bebdb1a9086e7f5b96a02 /streaming
parent7e56e99573b4cf161293e648aeb159375c9c0fcb (diff)
downloadspark-d761e7359deb7ca864d33b8f2e4380b57448630b.tar.gz
spark-d761e7359deb7ca864d33b8f2e4380b57448630b.tar.bz2
spark-d761e7359deb7ca864d33b8f2e4380b57448630b.zip
adding kafkaStream API tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala11
2 files changed, 13 insertions, 2 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 61e4c0a207..350d0888a3 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -4,6 +4,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
@@ -1203,8 +1204,7 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics,
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 1024d3ac97..595c766a21 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -240,6 +240,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output(i) === expectedOutput(i))
}
}
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val topics = Map("my-topic" -> 1)
+ val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
+ val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
+
+ // Test specifying decoder
+ val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group")
+ val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
+ }
}