aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-24 13:40:19 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-04-16 17:17:16 -0600
commit7e56e99573b4cf161293e648aeb159375c9c0fcb (patch)
tree9fa6d9f686cb39b3bb9a97196818079a71a38b31 /examples
parent8ac9efba5a435443be9abf8ebbe867806d42c9db (diff)
downloadspark-7e56e99573b4cf161293e648aeb159375c9c0fcb.tar.gz
spark-7e56e99573b4cf161293e648aeb159375c9c0fcb.tar.bz2
spark-7e56e99573b4cf161293e648aeb159375c9c0fcb.zip
Surfacing decoders on KafkaInputDStream
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 9b135a5c54..e0c3555f21 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -37,7 +37,7 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
+ val lines = ssc.kafkaStream(zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()