diff options
Diffstat (limited to 'examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java')
-rw-r--r-- | examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 0a56e7abdf..16b8a948e6 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples; import java.util.Map; import java.util.HashMap; +import java.util.regex.Pattern; import com.google.common.collect.Lists; import org.apache.spark.api.java.function.FlatMapFunction; @@ -29,6 +30,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** @@ -45,7 +47,12 @@ import scala.Tuple2; * zoo03 my-consumer-group topic1,topic2 1` */ -public class JavaKafkaWordCount { +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + public static void main(String[] args) { if (args.length < 5) { System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); @@ -53,7 +60,7 @@ public class JavaKafkaWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount", + JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); @@ -64,11 +71,11 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> tuple2) throws Exception { + public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); @@ -76,24 +83,24 @@ public class JavaKafkaWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String s) throws Exception { + public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); - ssc.start(); + jssc.start(); } } |