diff options
author | zsxwing <zsxwing@gmail.com> | 2015-05-28 09:04:12 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-05-28 09:04:22 -0700 |
commit | ab62d73ddb973c25de043e8e9ade7800adf244e8 (patch) | |
tree | db684799d67e5329959b03629b410d7d4ae156d9 /examples/src/main/java | |
parent | bd568df22445a1ca5183ce357410ef7a76f5bb81 (diff) | |
download | spark-ab62d73ddb973c25de043e8e9ade7800adf244e8.tar.gz spark-ab62d73ddb973c25de043e8e9ade7800adf244e8.tar.bz2 spark-ab62d73ddb973c25de043e8e9ade7800adf244e8.zip |
[SPARK-7895] [STREAMING] [EXAMPLES] Move Kafka examples from scala-2.10/src to src
Since `spark-streaming-kafka` now is published for both Scala 2.10 and 2.11, we can move `KafkaWordCount` and `DirectKafkaWordCount` from `examples/scala-2.10/src/` to `examples/src/` so that they will appear in `spark-examples-***-jar` for Scala 2.11.
Author: zsxwing <zsxwing@gmail.com>
Closes #6436 from zsxwing/SPARK-7895 and squashes the following commits:
c6052f1 [zsxwing] Update examples/pom.xml
0bcfa87 [zsxwing] Fix the sleep time
b9d1256 [zsxwing] Move Kafka examples from scala-2.10/src to src
(cherry picked from commit 000df2f0d6af068bb188e81bbb207f0c2f43bf16)
Signed-off-by: Patrick Wendell <patrick@databricks.com>
Diffstat (limited to 'examples/src/main/java')
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java | 113 | ||||
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java | 113 |
2 files changed, 226 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java new file mode 100644 index 0000000000..bab9f2478e --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import kafka.serializer.StringDecoder; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.Durations; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount <brokers> <topics> + * <brokers> is a list of one or more Kafka brokers + * <topics> is a list of one or more kafka topics to consume from + * + * Example: + * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 + */ + +public final class JavaDirectKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" + + " <brokers> is a list of one or more Kafka brokers\n" + + " <topics> is a list of one or more kafka topics to consume from\n\n"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + String brokers = args[0]; + String topics = args[1]; + + // Create context with 2 second batch interval + SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); + + HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); + HashMap<String, String> kafkaParams = new HashMap<String, String>(); + kafkaParams.put("metadata.broker.list", brokers); + + // Create direct kafka stream with brokers and topics + JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( + jssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topicsSet + ); + + // Get the lines, split them into words, count the words and print + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) { + return tuple2._2(); + } + }); + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey( + new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + wordCounts.print(); + + // Start the computation + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java new file mode 100644 index 0000000000..16ae9a3319 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Pattern; + + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * + * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + + public static void main(String[] args) { + if (args.length < 4) { + System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); + // Create the context with a 1 second batch size + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + + int numThreads = Integer.parseInt(args[3]); + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + String[] topics = args[2].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, args[0], args[1], topicMap); + + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) { + return tuple2._2(); + } + }); + + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream<String, Integer> wordCounts = words.mapToPair( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + jssc.start(); + jssc.awaitTermination(); + } +} |