diff options
Diffstat (limited to 'examples/src')
14 files changed, 164 insertions, 83 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index 12f3355bc4..71bd3b4821 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -24,19 +24,22 @@ import org.apache.spark.api.java.function.Function2; import java.io.Serializable; import java.util.Arrays; -import java.util.StringTokenizer; import java.util.Random; +import java.util.regex.Pattern; /** * Logistic regression based classification. */ -public class JavaHdfsLR { +public final class JavaHdfsLR { - static int D = 10; // Number of dimensions - static Random rand = new Random(42); + private static final int D = 10; // Number of dimensions + private static final Random rand = new Random(42); + + private JavaHdfsLR() { + } static class DataPoint implements Serializable { - public DataPoint(double[] x, double y) { + DataPoint(double[] x, double y) { this.x = x; this.y = y; } @@ -46,20 +49,22 @@ public class JavaHdfsLR { } static class ParsePoint extends Function<String, DataPoint> { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public DataPoint call(String line) { - StringTokenizer tok = new StringTokenizer(line, " "); - double y = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(line); + double y = Double.parseDouble(tok[0]); double[] x = new double[D]; - int i = 0; - while (i < D) { - x[i] = Double.parseDouble(tok.nextToken()); - i += 1; + for (int i = 0; i < D; i++) { + x[i] = Double.parseDouble(tok[i+1]); } return new DataPoint(x, y); } } static class VectorSum extends Function2<double[], double[], double[]> { + @Override public double[] call(double[] a, double[] b) { double[] result = new double[D]; for (int j = 0; j < D; j++) { @@ -70,12 +75,13 @@ public class JavaHdfsLR { } static class ComputeGradient extends Function<DataPoint, double[]> { - double[] weights; + private final double[] weights; - public ComputeGradient(double[] weights) { + ComputeGradient(double[] weights) { this.weights = weights; } + @Override public double[] call(DataPoint p) { double[] gradient = new double[D]; for (int i = 0; i < D; i++) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java index 63465a3bbf..0808f33e6a 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java @@ -27,19 +27,27 @@ import org.apache.spark.util.Vector; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** * K-means clustering using Java API. */ -public class JavaKMeans { +public final class JavaKMeans { + + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKMeans() { + } /** Parses numbers split by whitespace to a vector */ static Vector parseVector(String line) { - String[] splits = line.split(" "); + String[] splits = SPACE.split(line); double[] data = new double[splits.length]; int i = 0; - for (String s : splits) - data[i] = Double.parseDouble(splits[i++]); + for (String s : splits) { + data[i] = Double.parseDouble(s); + i++; + } return new Vector(data); } @@ -82,7 +90,7 @@ public class JavaKMeans { JavaRDD<Vector> data = sc.textFile(path).map( new Function<String, Vector>() { @Override - public Vector call(String line) throws Exception { + public Vector call(String line) { return parseVector(line); } } @@ -96,7 +104,7 @@ public class JavaKMeans { JavaPairRDD<Integer, Vector> closest = data.map( new PairFunction<Vector, Integer, Vector>() { @Override - public Tuple2<Integer, Vector> call(Vector vector) throws Exception { + public Tuple2<Integer, Vector> call(Vector vector) { return new Tuple2<Integer, Vector>( closestPoint(vector, centroids), vector); } @@ -107,7 +115,8 @@ public class JavaKMeans { JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey(); Map<Integer, Vector> newCentroids = pointsGroup.mapValues( new Function<List<Vector>, Vector>() { - public Vector call(List<Vector> ps) throws Exception { + @Override + public Vector call(List<Vector> ps) { return average(ps); } }).collectAsMap(); @@ -122,8 +131,9 @@ public class JavaKMeans { } while (tempDist > convergeDist); System.out.println("Final centers:"); - for (Vector c : centroids) + for (Vector c : centroids) { System.out.println(c); + } System.exit(0); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 74e4d9291a..d45d96d804 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -35,9 +35,9 @@ import java.util.regex.Pattern; /** * Executes a roll up-style query against Apache logs. */ -public class JavaLogQuery { +public final class JavaLogQuery { - public static List<String> exampleApacheLogs = Lists.newArrayList( + public static final List<String> exampleApacheLogs = Lists.newArrayList( "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " + "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " + @@ -51,14 +51,17 @@ public class JavaLogQuery { "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " + "0 73.23.2.15 images.com 1358492557 - Whatup"); - public static Pattern apacheLogRegex = Pattern.compile( + public static final Pattern apacheLogRegex = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); + private JavaLogQuery() { + } + /** Tracks the total query count and number of aggregate bytes for a particular group. */ public static class Stats implements Serializable { - private int count; - private int numBytes; + private final int count; + private final int numBytes; public Stats(int count, int numBytes) { this.count = count; @@ -92,12 +95,12 @@ public class JavaLogQuery { if (m.find()) { int bytes = Integer.parseInt(m.group(7)); return new Stats(1, bytes); - } - else + } else { return new Stats(1, 0); + } } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { if (args.length == 0) { System.err.println("Usage: JavaLogQuery <master> [logFile]"); System.exit(1); @@ -110,14 +113,14 @@ public class JavaLogQuery { JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() { @Override - public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception { + public Tuple2<Tuple3<String, String, String>, Stats> call(String s) { return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s)); } }); JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { @Override - public Stats call(Stats stats, Stats stats2) throws Exception { + public Stats call(Stats stats, Stats stats2) { return stats.merge(stats2); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index f774f6a04e..12d2cce1a7 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.PairFunction; import java.util.List; import java.util.ArrayList; +import java.util.regex.Pattern; /** * Computes the PageRank of URLs from an input file. Input file should @@ -39,7 +40,12 @@ import java.util.ArrayList; * ... * where URL and their neighbors are separated by space(s). */ -public class JavaPageRank { +public final class JavaPageRank { + private static final Pattern SPACES = Pattern.compile("\\s+"); + + private JavaPageRank() { + } + private static class Sum extends Function2<Double, Double, Double> { @Override public Double call(Double a, Double b) { @@ -67,7 +73,7 @@ public class JavaPageRank { JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) { - String[] parts = s.split("\\s+"); + String[] parts = SPACES.split(s); return new Tuple2<String, String>(parts[0], parts[1]); } }).distinct().groupByKey().cache(); @@ -75,7 +81,7 @@ public class JavaPageRank { // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() { @Override - public Double call(List<String> rs) throws Exception { + public Double call(List<String> rs) { return 1.0; } }); @@ -98,7 +104,7 @@ public class JavaPageRank { // Re-calculates URL ranks based on neighbor contributions. ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() { @Override - public Double call(Double sum) throws Exception { + public Double call(Double sum) { return 0.15 + sum * 0.85; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 5558ab7c03..f6ed510e05 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -26,8 +26,10 @@ import java.util.ArrayList; import java.util.List; /** Computes an approximation to pi */ -public class JavaSparkPi { +public final class JavaSparkPi { + private JavaSparkPi() { + } public static void main(String[] args) throws Exception { if (args.length == 0) { @@ -41,21 +43,22 @@ public class JavaSparkPi { int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); - for (int i = 0; i < n; i++) + for (int i = 0; i < n; i++) { l.add(i); + } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); int count = dataSet.map(new Function<Integer, Integer>() { @Override - public Integer call(Integer integer) throws Exception { + public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer integer, Integer integer2) throws Exception { + public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index 99e6ba347c..12b564d1ef 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -31,11 +31,14 @@ import java.util.Set; /** * Transitive closure on a graph, implemented in Java. */ -public class JavaTC { +public final class JavaTC { - static int numEdges = 200; - static int numVertices = 100; - static Random rand = new Random(42); + private static final int numEdges = 200; + private static final int numVertices = 100; + private static final Random rand = new Random(42); + + private JavaTC() { + } static List<Tuple2<Integer, Integer>> generateGraph() { Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges); @@ -43,15 +46,18 @@ public class JavaTC { int from = rand.nextInt(numVertices); int to = rand.nextInt(numVertices); Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to); - if (from != to) edges.add(e); + if (from != to) { + edges.add(e); + } } return new ArrayList<Tuple2<Integer, Integer>>(edges); } static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>, Integer, Integer> { - static ProjectFn INSTANCE = new ProjectFn(); + static final ProjectFn INSTANCE = new ProjectFn(); + @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) { return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1()); } @@ -76,6 +82,7 @@ public class JavaTC { // Because join() joins on keys, the edges are stored in reversed order. JavaPairRDD<Integer, Integer> edges = tc.map( new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { + @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) { return new Tuple2<Integer, Integer>(e._2(), e._1()); } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 8a071caf13..fc9beb8fe5 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -27,8 +27,14 @@ import org.apache.spark.api.java.function.PairFunction; import java.util.Arrays; import java.util.List; +import java.util.regex.Pattern; + +public final class JavaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaWordCount() { + } -public class JavaWordCount { public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaWordCount <master> <file>"); @@ -40,18 +46,21 @@ public class JavaWordCount { JavaRDD<String> lines = ctx.textFile(args[1], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override public Iterable<String> call(String s) { - return Arrays.asList(s.split(" ")); + return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { + @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java index 5e1a77baaa..c42d9cb788 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java @@ -26,28 +26,35 @@ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; import scala.Tuple2; /** * Example using MLLib ALS from Java. */ -public class JavaALS { +public final class JavaALS { + + private JavaALS() { + } static class ParseRating extends Function<String, Rating> { + private static final Pattern COMMA = Pattern.compile(","); + + @Override public Rating call(String line) { - StringTokenizer tok = new StringTokenizer(line, ","); - int x = Integer.parseInt(tok.nextToken()); - int y = Integer.parseInt(tok.nextToken()); - double rating = Double.parseDouble(tok.nextToken()); + String[] tok = COMMA.split(line); + int x = Integer.parseInt(tok[0]); + int y = Integer.parseInt(tok[1]); + double rating = Double.parseDouble(tok[2]); return new Rating(x, y, rating); } } static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> { + @Override public String call(Tuple2<Object, double[]> element) { - return element._1().toString() + "," + Arrays.toString(element._2()); + return element._1() + "," + Arrays.toString(element._2()); } } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java index 1f12f518a0..9d10473aed 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java @@ -25,20 +25,25 @@ import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; /** * Example using MLLib KMeans from Java. */ -public class JavaKMeans { +public final class JavaKMeans { + + private JavaKMeans() { + } static class ParsePoint extends Function<String, double[]> { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public double[] call(String line) { - StringTokenizer tok = new StringTokenizer(line, " "); - int numTokens = tok.countTokens(); - double[] point = new double[numTokens]; - for (int i = 0; i < numTokens; ++i) { - point[i] = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(line); + double[] point = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + point[i] = Double.parseDouble(tok[i]); } return point; } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java index 593e4df111..b057f71e08 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java @@ -27,22 +27,28 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.regression.LabeledPoint; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; /** * Logistic regression based classification using ML Lib. */ -public class JavaLR { +public final class JavaLR { + + private JavaLR() { + } static class ParsePoint extends Function<String, LabeledPoint> { + private static final Pattern COMMA = Pattern.compile(","); + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public LabeledPoint call(String line) { - String[] parts = line.split(","); + String[] parts = COMMA.split(line); double y = Double.parseDouble(parts[0]); - StringTokenizer tok = new StringTokenizer(parts[1], " "); - int numTokens = tok.countTokens(); - double[] x = new double[numTokens]; - for (int i = 0; i < numTokens; ++i) { - x[i] = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(parts[1]); + double[] x = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + x[i] = Double.parseDouble(tok[i]); } return new LabeledPoint(y, x); } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 64ac72474b..fd683ce0d3 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -36,7 +36,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent; * creates a server and listens for flume events. * <port> is the port the Flume receiver will listen on. */ -public class JavaFlumeEventCount { +public final class JavaFlumeEventCount { + private JavaFlumeEventCount() { + } + public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>"); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 0a56e7abdf..d8b4f4dddd 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples; import java.util.Map; import java.util.HashMap; +import java.util.regex.Pattern; import com.google.common.collect.Lists; import org.apache.spark.api.java.function.FlatMapFunction; @@ -45,7 +46,12 @@ import scala.Tuple2; * zoo03 my-consumer-group topic1,topic2 1` */ -public class JavaKafkaWordCount { +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + public static void main(String[] args) { if (args.length < 5) { System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); @@ -68,7 +74,7 @@ public class JavaKafkaWordCount { JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> tuple2) throws Exception { + public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); @@ -76,19 +82,19 @@ public class JavaKafkaWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String s) throws Exception { + public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index ec6f6a8c56..1e2efd359c 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import java.util.regex.Pattern; + /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * Usage: NetworkWordCount <master> <hostname> <port> @@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; * and then run the example * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` */ -public class JavaNetworkWordCount { +public final class JavaNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaNetworkWordCount() { + } + public static void main(String[] args) { if (args.length < 3) { System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + @@ -57,18 +64,18 @@ public class JavaNetworkWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String s) throws Exception { + public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index 4b9fd52713..e05551ab83 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -31,8 +31,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -public class JavaQueueStream { - public static void main(String[] args) throws InterruptedException { +public final class JavaQueueStream { + private JavaQueueStream() { + } + + public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaQueueStream <master>"); System.exit(1); @@ -62,14 +65,14 @@ public class JavaQueueStream { JavaPairDStream<Integer, Integer> mappedStream = inputStream.map( new PairFunction<Integer, Integer, Integer>() { @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { + public Tuple2<Integer, Integer> call(Integer i) { return new Tuple2<Integer, Integer>(i % 10, 1); } }); JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); |