diff options
author | zsxwing <zsxwing@gmail.com> | 2015-05-28 09:04:12 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-05-28 09:04:12 -0700 |
commit | 000df2f0d6af068bb188e81bbb207f0c2f43bf16 (patch) | |
tree | d9843f0570102ea4d6263ac716f7bdfd59a8d6e4 /examples/scala-2.10/src/main/java | |
parent | e838a25bdb5603ef05e779225704c972ce436145 (diff) | |
download | spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.tar.gz spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.tar.bz2 spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.zip |
[SPARK-7895] [STREAMING] [EXAMPLES] Move Kafka examples from scala-2.10/src to src
Since `spark-streaming-kafka` now is published for both Scala 2.10 and 2.11, we can move `KafkaWordCount` and `DirectKafkaWordCount` from `examples/scala-2.10/src/` to `examples/src/` so that they will appear in `spark-examples-***-jar` for Scala 2.11.
Author: zsxwing <zsxwing@gmail.com>
Closes #6436 from zsxwing/SPARK-7895 and squashes the following commits:
c6052f1 [zsxwing] Update examples/pom.xml
0bcfa87 [zsxwing] Fix the sleep time
b9d1256 [zsxwing] Move Kafka examples from scala-2.10/src to src
Diffstat (limited to 'examples/scala-2.10/src/main/java')
2 files changed, 0 insertions, 226 deletions
diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java deleted file mode 100644 index bab9f2478e..0000000000 --- a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Arrays; -import java.util.regex.Pattern; - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import kafka.serializer.StringDecoder; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.*; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.kafka.KafkaUtils; -import org.apache.spark.streaming.Durations; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: DirectKafkaWordCount <brokers> <topics> - * <brokers> is a list of one or more Kafka brokers - * <topics> is a list of one or more kafka topics to consume from - * - * Example: - * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 - */ - -public final class JavaDirectKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" + - " <brokers> is a list of one or more Kafka brokers\n" + - " <topics> is a list of one or more kafka topics to consume from\n\n"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - String brokers = args[0]; - String topics = args[1]; - - // Create context with 2 second batch interval - SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); - - HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); - HashMap<String, String> kafkaParams = new HashMap<String, String>(); - kafkaParams.put("metadata.broker.list", brokers); - - // Create direct kafka stream with brokers and topics - JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( - jssc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topicsSet - ); - - // Get the lines, split them into words, count the words and print - JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> tuple2) { - return tuple2._2(); - } - }); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<String, Integer>(s, 1); - } - }).reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - wordCounts.print(); - - // Start the computation - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java deleted file mode 100644 index 16ae9a3319..0000000000 --- a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.Map; -import java.util.HashMap; -import java.util.regex.Pattern; - - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * - * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> - * <zkQuorum> is a list of one or more zookeeper servers that make quorum - * <group> is the name of kafka consumer group - * <topics> is a list of one or more kafka topics to consume from - * <numThreads> is the number of threads the kafka consumer should use - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ - * zoo03 my-consumer-group topic1,topic2 1` - */ - -public final class JavaKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - private JavaKafkaWordCount() { - } - - public static void main(String[] args) { - if (args.length < 4) { - System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); - // Create the context with a 1 second batch size - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - - int numThreads = Integer.parseInt(args[3]); - Map<String, Integer> topicMap = new HashMap<String, Integer>(); - String[] topics = args[2].split(","); - for (String topic: topics) { - topicMap.put(topic, numThreads); - } - - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, args[0], args[1], topicMap); - - JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> tuple2) { - return tuple2._2(); - } - }); - - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<String, Integer>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - wordCounts.print(); - jssc.start(); - jssc.awaitTermination(); - } -} |