aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-28 09:04:12 -0700
committerPatrick Wendell <patrick@databricks.com>2015-05-28 09:04:12 -0700
commit000df2f0d6af068bb188e81bbb207f0c2f43bf16 (patch)
treed9843f0570102ea4d6263ac716f7bdfd59a8d6e4 /examples/src/main/scala
parente838a25bdb5603ef05e779225704c972ce436145 (diff)
downloadspark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.tar.gz
spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.tar.bz2
spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.zip
[SPARK-7895] [STREAMING] [EXAMPLES] Move Kafka examples from scala-2.10/src to src
Since `spark-streaming-kafka` now is published for both Scala 2.10 and 2.11, we can move `KafkaWordCount` and `DirectKafkaWordCount` from `examples/scala-2.10/src/` to `examples/src/` so that they will appear in `spark-examples-***-jar` for Scala 2.11. Author: zsxwing <zsxwing@gmail.com> Closes #6436 from zsxwing/SPARK-7895 and squashes the following commits: c6052f1 [zsxwing] Update examples/pom.xml 0bcfa87 [zsxwing] Fix the sleep time b9d1256 [zsxwing] Move Kafka examples from scala-2.10/src to src
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala72
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala103
2 files changed, 175 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
new file mode 100644
index 0000000000..11a8cf0953
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ * <brokers> is a list of one or more Kafka brokers
+ * <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
+ * topic1,topic2
+ */
+object DirectKafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(s"""
+ |Usage: DirectKafkaWordCount <brokers> <topics>
+ | <brokers> is a list of one or more Kafka brokers
+ | <topics> is a list of one or more kafka topics to consume from
+ |
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(brokers, topics) = args
+
+ // Create context with 2 second batch interval
+ val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+
+ // Create direct kafka stream with brokers and topics
+ val topicsSet = topics.split(",").toSet
+ val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+ val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topicsSet)
+
+ // Get the lines, split them into words, count the words and print
+ val lines = messages.map(_._2)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
+ wordCounts.print()
+
+ // Start the computation
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
new file mode 100644
index 0000000000..9ae1b045c2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.util.HashMap
+
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
+ * my-consumer-group topic1,topic2 1`
+ */
+object KafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(zkQuorum, group, topics, numThreads) = args
+ val sparkConf = new SparkConf().setAppName("KafkaWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+ ssc.checkpoint("checkpoint")
+
+ val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1L))
+ .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
+ wordCounts.print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
+ "<messagesPerSec> <wordsPerMessage>")
+ System.exit(1)
+ }
+
+ val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
+
+ // Zookeeper connection properties
+ val props = new HashMap[String, Object]()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer")
+
+ val producer = new KafkaProducer[String, String](props)
+
+ // Send some messages
+ while(true) {
+ (1 to messagesPerSec.toInt).foreach { messageNum =>
+ val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
+ .mkString(" ")
+
+ val message = new ProducerRecord[String, String](topic, null, str)
+ producer.send(message)
+ }
+
+ Thread.sleep(1000)
+ }
+ }
+
+}