From 000df2f0d6af068bb188e81bbb207f0c2f43bf16 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 28 May 2015 09:04:12 -0700 Subject: [SPARK-7895] [STREAMING] [EXAMPLES] Move Kafka examples from scala-2.10/src to src Since `spark-streaming-kafka` now is published for both Scala 2.10 and 2.11, we can move `KafkaWordCount` and `DirectKafkaWordCount` from `examples/scala-2.10/src/` to `examples/src/` so that they will appear in `spark-examples-***-jar` for Scala 2.11. Author: zsxwing Closes #6436 from zsxwing/SPARK-7895 and squashes the following commits: c6052f1 [zsxwing] Update examples/pom.xml 0bcfa87 [zsxwing] Fix the sleep time b9d1256 [zsxwing] Move Kafka examples from scala-2.10/src to src --- examples/pom.xml | 44 +------- .../streaming/JavaDirectKafkaWordCount.java | 113 --------------------- .../examples/streaming/JavaKafkaWordCount.java | 113 --------------------- .../examples/streaming/DirectKafkaWordCount.scala | 72 ------------- .../spark/examples/streaming/KafkaWordCount.scala | 103 ------------------- .../streaming/JavaDirectKafkaWordCount.java | 113 +++++++++++++++++++++ .../examples/streaming/JavaKafkaWordCount.java | 113 +++++++++++++++++++++ .../examples/streaming/DirectKafkaWordCount.scala | 72 +++++++++++++ .../spark/examples/streaming/KafkaWordCount.scala | 103 +++++++++++++++++++ 9 files changed, 406 insertions(+), 440 deletions(-) delete mode 100644 examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java delete mode 100644 examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java delete mode 100644 examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala delete mode 100644 examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala diff --git a/examples/pom.xml b/examples/pom.xml index 5b04b4f8d6..e4efee7b5e 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -97,6 +97,11 @@ + + org.apache.spark + spark-streaming-kafka_${scala.binary.version} + ${project.version} + org.apache.hbase hbase-testing-util @@ -392,45 +397,6 @@ - - - scala-2.10 - - !scala-2.11 - - - - org.apache.spark - spark-streaming-kafka_${scala.binary.version} - ${project.version} - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-scala-sources - generate-sources - - add-source - - - - src/main/scala - scala-2.10/src/main/scala - scala-2.10/src/main/java - - - - - - - - diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java deleted file mode 100644 index bab9f2478e..0000000000 --- a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Arrays; -import java.util.regex.Pattern; - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import kafka.serializer.StringDecoder; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.*; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.kafka.KafkaUtils; -import org.apache.spark.streaming.Durations; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: DirectKafkaWordCount - * is a list of one or more Kafka brokers - * is a list of one or more kafka topics to consume from - * - * Example: - * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 - */ - -public final class JavaDirectKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Usage: DirectKafkaWordCount \n" + - " is a list of one or more Kafka brokers\n" + - " is a list of one or more kafka topics to consume from\n\n"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - String brokers = args[0]; - String topics = args[1]; - - // Create context with 2 second batch interval - SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); - - HashSet topicsSet = new HashSet(Arrays.asList(topics.split(","))); - HashMap kafkaParams = new HashMap(); - kafkaParams.put("metadata.broker.list", brokers); - - // Create direct kafka stream with brokers and topics - JavaPairInputDStream messages = KafkaUtils.createDirectStream( - jssc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topicsSet - ); - - // Get the lines, split them into words, count the words and print - JavaDStream lines = messages.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey( - new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - wordCounts.print(); - - // Start the computation - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java deleted file mode 100644 index 16ae9a3319..0000000000 --- a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.Map; -import java.util.HashMap; -import java.util.regex.Pattern; - - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * - * Usage: JavaKafkaWordCount - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ - * zoo03 my-consumer-group topic1,topic2 1` - */ - -public final class JavaKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - private JavaKafkaWordCount() { - } - - public static void main(String[] args) { - if (args.length < 4) { - System.err.println("Usage: JavaKafkaWordCount "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); - // Create the context with a 1 second batch size - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - - int numThreads = Integer.parseInt(args[3]); - Map topicMap = new HashMap(); - String[] topics = args[2].split(","); - for (String topic: topics) { - topicMap.put(topic, numThreads); - } - - JavaPairReceiverInputDStream messages = - KafkaUtils.createStream(jssc, args[0], args[1], topicMap); - - JavaDStream lines = messages.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); - - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - wordCounts.print(); - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala deleted file mode 100644 index 11a8cf0953..0000000000 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import kafka.serializer.StringDecoder - -import org.apache.spark.streaming._ -import org.apache.spark.streaming.kafka._ -import org.apache.spark.SparkConf - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: DirectKafkaWordCount - * is a list of one or more Kafka brokers - * is a list of one or more kafka topics to consume from - * - * Example: - * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ - * topic1,topic2 - */ -object DirectKafkaWordCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println(s""" - |Usage: DirectKafkaWordCount - | is a list of one or more Kafka brokers - | is a list of one or more kafka topics to consume from - | - """.stripMargin) - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(brokers, topics) = args - - // Create context with 2 second batch interval - val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - - // Create direct kafka stream with brokers and topics - val topicsSet = topics.split(",").toSet - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) - val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet) - - // Get the lines, split them into words, count the words and print - val lines = messages.map(_._2) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) - wordCounts.print() - - // Start the computation - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala deleted file mode 100644 index f407367a54..0000000000 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import java.util.HashMap - -import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} - -import org.apache.spark.streaming._ -import org.apache.spark.streaming.kafka._ -import org.apache.spark.SparkConf - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: KafkaWordCount - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use - * - * Example: - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ - * my-consumer-group topic1,topic2 1` - */ -object KafkaWordCount { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCount ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(zkQuorum, group, topics, numThreads) = args - val sparkConf = new SparkConf().setAppName("KafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("checkpoint") - - val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1L)) - .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - -// Produces some random words between 1 and 100. -object KafkaWordCountProducer { - - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCountProducer " + - " ") - System.exit(1) - } - - val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args - - // Zookeeper connection properties - val props = new HashMap[String, Object]() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer") - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer") - - val producer = new KafkaProducer[String, String](props) - - // Send some messages - while(true) { - (1 to messagesPerSec.toInt).foreach { messageNum => - val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) - .mkString(" ") - - val message = new ProducerRecord[String, String](topic, null, str) - producer.send(message) - } - - Thread.sleep(100) - } - } - -} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java new file mode 100644 index 0000000000..bab9f2478e --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import kafka.serializer.StringDecoder; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.Durations; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount + * is a list of one or more Kafka brokers + * is a list of one or more kafka topics to consume from + * + * Example: + * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 + */ + +public final class JavaDirectKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: DirectKafkaWordCount \n" + + " is a list of one or more Kafka brokers\n" + + " is a list of one or more kafka topics to consume from\n\n"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + String brokers = args[0]; + String topics = args[1]; + + // Create context with 2 second batch interval + SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); + + HashSet topicsSet = new HashSet(Arrays.asList(topics.split(","))); + HashMap kafkaParams = new HashMap(); + kafkaParams.put("metadata.broker.list", brokers); + + // Create direct kafka stream with brokers and topics + JavaPairInputDStream messages = KafkaUtils.createDirectStream( + jssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topicsSet + ); + + // Get the lines, split them into words, count the words and print + JavaDStream lines = messages.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + wordCounts.print(); + + // Start the computation + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java new file mode 100644 index 0000000000..16ae9a3319 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Pattern; + + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * + * Usage: JavaKafkaWordCount + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + + public static void main(String[] args) { + if (args.length < 4) { + System.err.println("Usage: JavaKafkaWordCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); + // Create the context with a 1 second batch size + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + + int numThreads = Integer.parseInt(args[3]); + Map topicMap = new HashMap(); + String[] topics = args[2].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairReceiverInputDStream messages = + KafkaUtils.createStream(jssc, args[0], args[1], topicMap); + + JavaDStream lines = messages.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala new file mode 100644 index 0000000000..11a8cf0953 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import kafka.serializer.StringDecoder + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount + * is a list of one or more Kafka brokers + * is a list of one or more kafka topics to consume from + * + * Example: + * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ + * topic1,topic2 + */ +object DirectKafkaWordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println(s""" + |Usage: DirectKafkaWordCount + | is a list of one or more Kafka brokers + | is a list of one or more kafka topics to consume from + | + """.stripMargin) + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(brokers, topics) = args + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + + // Create direct kafka stream with brokers and topics + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get the lines, split them into words, count the words and print + val lines = messages.map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) + wordCounts.print() + + // Start the computation + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala new file mode 100644 index 0000000000..9ae1b045c2 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.util.HashMap + +import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: KafkaWordCount + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * Example: + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` + */ +object KafkaWordCount { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(zkQuorum, group, topics, numThreads) = args + val sparkConf = new SparkConf().setAppName("KafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("checkpoint") + + val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)) + .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCountProducer " + + " ") + System.exit(1) + } + + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args + + // Zookeeper connection properties + val props = new HashMap[String, Object]() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + + val producer = new KafkaProducer[String, String](props) + + // Send some messages + while(true) { + (1 to messagesPerSec.toInt).foreach { messageNum => + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + val message = new ProducerRecord[String, String](topic, null, str) + producer.send(message) + } + + Thread.sleep(1000) + } + } + +} -- cgit v1.2.3