aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java/org
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-28 09:04:12 -0700
committerPatrick Wendell <patrick@databricks.com>2015-05-28 09:04:12 -0700
commit000df2f0d6af068bb188e81bbb207f0c2f43bf16 (patch)
treed9843f0570102ea4d6263ac716f7bdfd59a8d6e4 /examples/src/main/java/org
parente838a25bdb5603ef05e779225704c972ce436145 (diff)
downloadspark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.tar.gz
spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.tar.bz2
spark-000df2f0d6af068bb188e81bbb207f0c2f43bf16.zip
[SPARK-7895] [STREAMING] [EXAMPLES] Move Kafka examples from scala-2.10/src to src
Since `spark-streaming-kafka` now is published for both Scala 2.10 and 2.11, we can move `KafkaWordCount` and `DirectKafkaWordCount` from `examples/scala-2.10/src/` to `examples/src/` so that they will appear in `spark-examples-***-jar` for Scala 2.11. Author: zsxwing <zsxwing@gmail.com> Closes #6436 from zsxwing/SPARK-7895 and squashes the following commits: c6052f1 [zsxwing] Update examples/pom.xml 0bcfa87 [zsxwing] Fix the sleep time b9d1256 [zsxwing] Move Kafka examples from scala-2.10/src to src
Diffstat (limited to 'examples/src/main/java/org')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java113
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java113
2 files changed, 226 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
new file mode 100644
index 0000000000..bab9f2478e
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import org.apache.spark.streaming.Durations;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ * <brokers> is a list of one or more Kafka brokers
+ * <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
+ */
+
+public final class JavaDirectKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
+ " <brokers> is a list of one or more Kafka brokers\n" +
+ " <topics> is a list of one or more kafka topics to consume from\n\n");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ String brokers = args[0];
+ String topics = args[1];
+
+ // Create context with 2 second batch interval
+ SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
+
+ HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
+ HashMap<String, String> kafkaParams = new HashMap<String, String>();
+ kafkaParams.put("metadata.broker.list", brokers);
+
+ // Create direct kafka stream with brokers and topics
+ JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
+ jssc,
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topicsSet
+ );
+
+ // Get the lines, split them into words, count the words and print
+ JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) {
+ return tuple2._2();
+ }
+ });
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+ wordCounts.print();
+
+ // Start the computation
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
new file mode 100644
index 0000000000..16ae9a3319
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ *
+ * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
+ * zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public final class JavaKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKafkaWordCount() {
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 4) {
+ System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+ SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
+ // Create the context with a 1 second batch size
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
+
+ int numThreads = Integer.parseInt(args[3]);
+ Map<String, Integer> topicMap = new HashMap<String, Integer>();
+ String[] topics = args[2].split(",");
+ for (String topic: topics) {
+ topicMap.put(topic, numThreads);
+ }
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
+
+ JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}