aboutsummaryrefslogtreecommitdiff
path: root/examples/scala-2.10/src/main/scala/org
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-09 22:45:48 -0800
commitc15134632e74e3dee05eda20c6ef79915e15d02e (patch)
tree4c97e1c6b7951d97950a7ff45c43b79d60733ede /examples/scala-2.10/src/main/scala/org
parentef2f55b97f58fa06acb30e9e0172fb66fba383bc (diff)
downloadspark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.gz
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.tar.bz2
spark-c15134632e74e3dee05eda20c6ef79915e15d02e.zip
[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream
Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints Might add more changes. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4384 from tdas/new-kafka-fixes and squashes the following commits: 7c931c3 [Tathagata Das] Small update 3ed9284 [Tathagata Das] updated scala doc 83d0402 [Tathagata Das] Added JavaDirectKafkaWordCount example. 26df23c [Tathagata Das] Updates based on PR comments from Cody e4abf69 [Tathagata Das] Scala doc improvements and stuff. bb65232 [Tathagata Das] Fixed test bug and refactored KafkaStreamSuite 50f2b56 [Tathagata Das] Added Java API and added more Scala and Java unit tests. Also updated docs. e73589c [Tathagata Das] Minor changes. 4986784 [Tathagata Das] Added unit test to kafka offset recovery 6a91cab [Tathagata Das] Added example
Diffstat (limited to 'examples/scala-2.10/src/main/scala/org')
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala71
1 files changed, 71 insertions, 0 deletions
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
new file mode 100644
index 0000000000..deb08fd57b
--- /dev/null
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ * <brokers> is a list of one or more Kafka brokers
+ * <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
+ */
+object DirectKafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(s"""
+ |Usage: DirectKafkaWordCount <brokers> <topics>
+ | <brokers> is a list of one or more Kafka brokers
+ | <topics> is a list of one or more kafka topics to consume from
+ |
+ """".stripMargin)
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(brokers, topics) = args
+
+ // Create context with 2 second batch interval
+ val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+
+ // Create direct kafka stream with brokers and topics
+ val topicsSet = topics.split(",").toSet
+ val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+ val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topicsSet)
+
+ // Get the lines, split them into words, count the words and print
+ val lines = messages.map(_._2)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
+ wordCounts.print()
+
+ // Start the computation
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}