From 66d501276b5a066bd9abaa4e284cfad557665948 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 2 Jan 2014 16:17:57 +0000 Subject: Suggested small changes to Java code for slightly more standard style, encapsulation and in some cases performance --- .../org/apache/spark/network/netty/FileClient.java | 5 ++- .../netty/FileClientChannelInitializer.java | 2 +- .../org/apache/spark/network/netty/FileServer.java | 8 ++-- .../netty/FileServerChannelInitializer.java | 4 +- .../spark/network/netty/FileServerHandler.java | 6 +-- .../apache/spark/network/netty/PathResolver.java | 52 +++++++++++----------- .../java/org/apache/spark/examples/JavaHdfsLR.java | 32 +++++++------ .../java/org/apache/spark/examples/JavaKMeans.java | 26 +++++++---- .../org/apache/spark/examples/JavaLogQuery.java | 23 +++++----- .../org/apache/spark/examples/JavaPageRank.java | 14 ++++-- .../org/apache/spark/examples/JavaSparkPi.java | 11 +++-- .../java/org/apache/spark/examples/JavaTC.java | 19 +++++--- .../org/apache/spark/examples/JavaWordCount.java | 13 +++++- .../org/apache/spark/mllib/examples/JavaALS.java | 21 ++++++--- .../apache/spark/mllib/examples/JavaKMeans.java | 19 +++++--- .../org/apache/spark/mllib/examples/JavaLR.java | 22 +++++---- .../streaming/examples/JavaFlumeEventCount.java | 5 ++- .../streaming/examples/JavaKafkaWordCount.java | 16 ++++--- .../streaming/examples/JavaNetworkWordCount.java | 15 +++++-- .../spark/streaming/examples/JavaQueueStream.java | 11 +++-- 20 files changed, 203 insertions(+), 121 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index 46d61503bc..d2d778b756 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit; class FileClient { - private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName()); + private final FileClientHandler handler; private Channel channel = null; private Bootstrap bootstrap = null; @@ -39,7 +40,7 @@ class FileClient { private final int connectTimeout; private final int sendTimeout = 60; // 1 min - public FileClient(FileClientHandler handler, int connectTimeout) { + FileClient(FileClientHandler handler, int connectTimeout) { this.handler = handler; this.connectTimeout = connectTimeout; } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java index fb61be1c12..264cf97d02 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java @@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer { private final FileClientHandler fhandler; - public FileClientChannelInitializer(FileClientHandler handler) { + FileClientChannelInitializer(FileClientHandler handler) { fhandler = handler; } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java index aea7534459..c93425e278 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java @@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory; */ class FileServer { - private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName()); private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private ChannelFuture channelFuture = null; private int port = 0; - private Thread blockingThread = null; - public FileServer(PathResolver pResolver, int port) { + FileServer(PathResolver pResolver, int port) { InetSocketAddress addr = new InetSocketAddress(port); // Configure the server. @@ -70,7 +69,8 @@ class FileServer { * Start the file server asynchronously in a new thread. */ public void start() { - blockingThread = new Thread() { + Thread blockingThread = new Thread() { + @Override public void run() { try { channelFuture.channel().closeFuture().sync(); diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java index 3f15ff898f..46efec8f8d 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java @@ -25,9 +25,9 @@ import io.netty.handler.codec.string.StringDecoder; class FileServerChannelInitializer extends ChannelInitializer { - PathResolver pResolver; + private final PathResolver pResolver; - public FileServerChannelInitializer(PathResolver pResolver) { + FileServerChannelInitializer(PathResolver pResolver) { this.pResolver = pResolver; } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index e2d9391b4c..3ac045f944 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory; class FileServerHandler extends SimpleChannelInboundHandler { - private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName()); private final PathResolver pResolver; - public FileServerHandler(PathResolver pResolver){ + FileServerHandler(PathResolver pResolver){ this.pResolver = pResolver; } @@ -61,7 +61,7 @@ class FileServerHandler extends SimpleChannelInboundHandler { ctx.flush(); return; } - int len = new Long(length).intValue(); + int len = (int) length; ctx.write((new FileHeader(len, blockId)).buffer()); try { ctx.write(new DefaultFileRegion(new FileInputStream(file) diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 9f7ced44cf..7ad8d03efb 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -1,26 +1,26 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.netty; - -import org.apache.spark.storage.BlockId; -import org.apache.spark.storage.FileSegment; - -public interface PathResolver { - /** Get the file segment in which the given block resides. */ - public FileSegment getBlockLocation(BlockId blockId); -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.netty; + +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; + +public interface PathResolver { + /** Get the file segment in which the given block resides. */ + FileSegment getBlockLocation(BlockId blockId); +} diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index be0d38589c..9f0e3412a6 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -24,19 +24,22 @@ import org.apache.spark.api.java.function.Function2; import java.io.Serializable; import java.util.Arrays; -import java.util.StringTokenizer; import java.util.Random; +import java.util.regex.Pattern; /** * Logistic regression based classification. */ -public class JavaHdfsLR { +public final class JavaHdfsLR { - static int D = 10; // Number of dimensions - static Random rand = new Random(42); + private static final int D = 10; // Number of dimensions + private static final Random rand = new Random(42); + + private JavaHdfsLR() { + } static class DataPoint implements Serializable { - public DataPoint(double[] x, double y) { + DataPoint(double[] x, double y) { this.x = x; this.y = y; } @@ -46,20 +49,22 @@ public class JavaHdfsLR { } static class ParsePoint extends Function { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public DataPoint call(String line) { - StringTokenizer tok = new StringTokenizer(line, " "); - double y = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(line); + double y = Double.parseDouble(tok[0]); double[] x = new double[D]; - int i = 0; - while (i < D) { - x[i] = Double.parseDouble(tok.nextToken()); - i += 1; + for (int i = 0; i < D; i++) { + x[i] = Double.parseDouble(tok[i+1]); } return new DataPoint(x, y); } } static class VectorSum extends Function2 { + @Override public double[] call(double[] a, double[] b) { double[] result = new double[D]; for (int j = 0; j < D; j++) { @@ -70,12 +75,13 @@ public class JavaHdfsLR { } static class ComputeGradient extends Function { - double[] weights; + private final double[] weights; - public ComputeGradient(double[] weights) { + ComputeGradient(double[] weights) { this.weights = weights; } + @Override public double[] call(DataPoint p) { double[] gradient = new double[D]; for (int i = 0; i < D; i++) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java index 5a6afe7eae..1671d0cdc8 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java @@ -27,19 +27,27 @@ import org.apache.spark.util.Vector; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** * K-means clustering using Java API. */ -public class JavaKMeans { +public final class JavaKMeans { + + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKMeans() { + } /** Parses numbers split by whitespace to a vector */ static Vector parseVector(String line) { - String[] splits = line.split(" "); + String[] splits = SPACE.split(line); double[] data = new double[splits.length]; int i = 0; - for (String s : splits) - data[i] = Double.parseDouble(splits[i++]); + for (String s : splits) { + data[i] = Double.parseDouble(s); + i++; + } return new Vector(data); } @@ -82,7 +90,7 @@ public class JavaKMeans { JavaRDD data = sc.textFile(path).map( new Function() { @Override - public Vector call(String line) throws Exception { + public Vector call(String line) { return parseVector(line); } } @@ -96,7 +104,7 @@ public class JavaKMeans { JavaPairRDD closest = data.map( new PairFunction() { @Override - public Tuple2 call(Vector vector) throws Exception { + public Tuple2 call(Vector vector) { return new Tuple2( closestPoint(vector, centroids), vector); } @@ -107,7 +115,8 @@ public class JavaKMeans { JavaPairRDD> pointsGroup = closest.groupByKey(); Map newCentroids = pointsGroup.mapValues( new Function, Vector>() { - public Vector call(List ps) throws Exception { + @Override + public Vector call(List ps) { return average(ps); } }).collectAsMap(); @@ -122,8 +131,9 @@ public class JavaKMeans { } while (tempDist > convergeDist); System.out.println("Final centers:"); - for (Vector c : centroids) + for (Vector c : centroids) { System.out.println(c); + } System.exit(0); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 407cd7ccfa..1ce53fe403 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -35,9 +35,9 @@ import java.util.regex.Pattern; /** * Executes a roll up-style query against Apache logs. */ -public class JavaLogQuery { +public final class JavaLogQuery { - public static List exampleApacheLogs = Lists.newArrayList( + public static final List exampleApacheLogs = Lists.newArrayList( "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " + "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " + @@ -51,14 +51,17 @@ public class JavaLogQuery { "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " + "0 73.23.2.15 images.com 1358492557 - Whatup"); - public static Pattern apacheLogRegex = Pattern.compile( + public static final Pattern apacheLogRegex = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); + private JavaLogQuery() { + } + /** Tracks the total query count and number of aggregate bytes for a particular group. */ public static class Stats implements Serializable { - private int count; - private int numBytes; + private final int count; + private final int numBytes; public Stats(int count, int numBytes) { this.count = count; @@ -92,12 +95,12 @@ public class JavaLogQuery { if (m.find()) { int bytes = Integer.parseInt(m.group(7)); return new Stats(1, bytes); - } - else + } else { return new Stats(1, 0); + } } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { if (args.length == 0) { System.err.println("Usage: JavaLogQuery [logFile]"); System.exit(1); @@ -110,14 +113,14 @@ public class JavaLogQuery { JavaPairRDD, Stats> extracted = dataSet.map(new PairFunction, Stats>() { @Override - public Tuple2, Stats> call(String s) throws Exception { + public Tuple2, Stats> call(String s) { return new Tuple2, Stats>(extractKey(s), extractStats(s)); } }); JavaPairRDD, Stats> counts = extracted.reduceByKey(new Function2() { @Override - public Stats call(Stats stats, Stats stats2) throws Exception { + public Stats call(Stats stats, Stats stats2) { return stats.merge(stats2); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index 89aed8f279..447ba93bd6 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFunction; import java.util.List; import java.util.ArrayList; +import java.util.regex.Pattern; /** * Computes the PageRank of URLs from an input file. Input file should @@ -38,7 +39,12 @@ import java.util.ArrayList; * ... * where URL and their neighbors are separated by space(s). */ -public class JavaPageRank { +public final class JavaPageRank { + private static final Pattern SPACES = Pattern.compile("\\s+"); + + private JavaPageRank() { + } + private static class Sum extends Function2 { @Override public Double call(Double a, Double b) { @@ -66,7 +72,7 @@ public class JavaPageRank { JavaPairRDD> links = lines.map(new PairFunction() { @Override public Tuple2 call(String s) { - String[] parts = s.split("\\s+"); + String[] parts = SPACES.split(s); return new Tuple2(parts[0], parts[1]); } }).distinct().groupByKey().cache(); @@ -74,7 +80,7 @@ public class JavaPageRank { // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. JavaPairRDD ranks = links.mapValues(new Function, Double>() { @Override - public Double call(List rs) throws Exception { + public Double call(List rs) { return 1.0; } }); @@ -97,7 +103,7 @@ public class JavaPageRank { // Re-calculates URL ranks based on neighbor contributions. ranks = contribs.reduceByKey(new Sum()).mapValues(new Function() { @Override - public Double call(Double sum) throws Exception { + public Double call(Double sum) { return 0.15 + sum * 0.85; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 4a2380caf5..d2a2a1db7c 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -26,8 +26,10 @@ import java.util.ArrayList; import java.util.List; /** Computes an approximation to pi */ -public class JavaSparkPi { +public final class JavaSparkPi { + private JavaSparkPi() { + } public static void main(String[] args) throws Exception { if (args.length == 0) { @@ -41,21 +43,22 @@ public class JavaSparkPi { int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2; int n = 100000 * slices; List l = new ArrayList(n); - for (int i = 0; i < n; i++) + for (int i = 0; i < n; i++) { l.add(i); + } JavaRDD dataSet = jsc.parallelize(l, slices); int count = dataSet.map(new Function() { @Override - public Integer call(Integer integer) throws Exception { + public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2() { @Override - public Integer call(Integer integer, Integer integer2) throws Exception { + public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index 17f21f6b77..e61b9c4f0e 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -31,11 +31,14 @@ import java.util.Set; /** * Transitive closure on a graph, implemented in Java. */ -public class JavaTC { +public final class JavaTC { - static int numEdges = 200; - static int numVertices = 100; - static Random rand = new Random(42); + private static final int numEdges = 200; + private static final int numVertices = 100; + private static final Random rand = new Random(42); + + private JavaTC() { + } static List> generateGraph() { Set> edges = new HashSet>(numEdges); @@ -43,15 +46,18 @@ public class JavaTC { int from = rand.nextInt(numVertices); int to = rand.nextInt(numVertices); Tuple2 e = new Tuple2(from, to); - if (from != to) edges.add(e); + if (from != to) { + edges.add(e); + } } return new ArrayList>(edges); } static class ProjectFn extends PairFunction>, Integer, Integer> { - static ProjectFn INSTANCE = new ProjectFn(); + static final ProjectFn INSTANCE = new ProjectFn(); + @Override public Tuple2 call(Tuple2> triple) { return new Tuple2(triple._2()._2(), triple._2()._1()); } @@ -76,6 +82,7 @@ public class JavaTC { // Because join() joins on keys, the edges are stored in reversed order. JavaPairRDD edges = tc.map( new PairFunction, Integer, Integer>() { + @Override public Tuple2 call(Tuple2 e) { return new Tuple2(e._2(), e._1()); } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index bd6383e13d..ed4e9b49d0 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -27,8 +27,14 @@ import org.apache.spark.api.java.function.PairFunction; import java.util.Arrays; import java.util.List; +import java.util.regex.Pattern; + +public final class JavaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaWordCount() { + } -public class JavaWordCount { public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaWordCount "); @@ -40,18 +46,21 @@ public class JavaWordCount { JavaRDD lines = ctx.textFile(args[1], 1); JavaRDD words = lines.flatMap(new FlatMapFunction() { + @Override public Iterable call(String s) { - return Arrays.asList(s.split(" ")); + return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD ones = words.map(new PairFunction() { + @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }); JavaPairRDD counts = ones.reduceByKey(new Function2() { + @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java index 45a0d237da..b33e648147 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java @@ -26,28 +26,35 @@ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; import scala.Tuple2; /** * Example using MLLib ALS from Java. */ -public class JavaALS { +public final class JavaALS { + + private JavaALS() { + } static class ParseRating extends Function { + private static final Pattern COMMA = Pattern.compile(","); + + @Override public Rating call(String line) { - StringTokenizer tok = new StringTokenizer(line, ","); - int x = Integer.parseInt(tok.nextToken()); - int y = Integer.parseInt(tok.nextToken()); - double rating = Double.parseDouble(tok.nextToken()); + String[] tok = COMMA.split(line); + int x = Integer.parseInt(tok[0]); + int y = Integer.parseInt(tok[1]); + double rating = Double.parseDouble(tok[2]); return new Rating(x, y, rating); } } static class FeaturesToString extends Function, String> { + @Override public String call(Tuple2 element) { - return element._1().toString() + "," + Arrays.toString(element._2()); + return element._1() + "," + Arrays.toString(element._2()); } } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java index cd59a139b9..a9db04d525 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java @@ -25,20 +25,25 @@ import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; /** * Example using MLLib KMeans from Java. */ -public class JavaKMeans { +public final class JavaKMeans { + + private JavaKMeans() { + } static class ParsePoint extends Function { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public double[] call(String line) { - StringTokenizer tok = new StringTokenizer(line, " "); - int numTokens = tok.countTokens(); - double[] point = new double[numTokens]; - for (int i = 0; i < numTokens; ++i) { - point[i] = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(line); + double[] point = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + point[i] = Double.parseDouble(tok[i]); } return point; } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java index 258061c8e6..56341315bf 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java @@ -27,22 +27,28 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.regression.LabeledPoint; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; /** * Logistic regression based classification using ML Lib. */ -public class JavaLR { +public final class JavaLR { + + private JavaLR() { + } static class ParsePoint extends Function { + private static final Pattern COMMA = Pattern.compile(","); + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public LabeledPoint call(String line) { - String[] parts = line.split(","); + String[] parts = COMMA.split(line); double y = Double.parseDouble(parts[0]); - StringTokenizer tok = new StringTokenizer(parts[1], " "); - int numTokens = tok.countTokens(); - double[] x = new double[numTokens]; - for (int i = 0; i < numTokens; ++i) { - x[i] = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(parts[1]); + double[] x = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + x[i] = Double.parseDouble(tok[i]); } return new LabeledPoint(y, x); } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 261813bf2f..bd0bbb56ff 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -36,7 +36,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent; * creates a server and listens for flume events. * is the port the Flume receiver will listen on. */ -public class JavaFlumeEventCount { +public final class JavaFlumeEventCount { + private JavaFlumeEventCount() { + } + public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaFlumeEventCount "); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 22994fb2ec..17eb871908 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples; import java.util.Map; import java.util.HashMap; +import java.util.regex.Pattern; import com.google.common.collect.Lists; import org.apache.spark.api.java.function.FlatMapFunction; @@ -45,7 +46,12 @@ import scala.Tuple2; * zoo03 my-consumer-group topic1,topic2 1` */ -public class JavaKafkaWordCount { +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + public static void main(String[] args) { if (args.length < 5) { System.err.println("Usage: KafkaWordCount "); @@ -67,7 +73,7 @@ public class JavaKafkaWordCount { JavaDStream lines = messages.map(new Function, String>() { @Override - public String call(Tuple2 tuple2) throws Exception { + public String call(Tuple2 tuple2) { return tuple2._2(); } }); @@ -75,19 +81,19 @@ public class JavaKafkaWordCount { JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream wordCounts = words.map( new PairFunction() { @Override - public Tuple2 call(String s) throws Exception { + public Tuple2 call(String s) { return new Tuple2(s, 1); } }).reduceByKey(new Function2() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index def87c199b..fb090cc262 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import java.util.regex.Pattern; + /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * Usage: NetworkWordCount @@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; * and then run the example * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` */ -public class JavaNetworkWordCount { +public final class JavaNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaNetworkWordCount() { + } + public static void main(String[] args) { if (args.length < 3) { System.err.println("Usage: NetworkWordCount \n" + @@ -56,18 +63,18 @@ public class JavaNetworkWordCount { JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream wordCounts = words.map( new PairFunction() { @Override - public Tuple2 call(String s) throws Exception { + public Tuple2 call(String s) { return new Tuple2(s, 1); } }).reduceByKey(new Function2() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index c8c7389dd1..6be967237c 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -31,8 +31,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -public class JavaQueueStream { - public static void main(String[] args) throws InterruptedException { +public final class JavaQueueStream { + private JavaQueueStream() { + } + + public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaQueueStream "); System.exit(1); @@ -62,14 +65,14 @@ public class JavaQueueStream { JavaPairDStream mappedStream = inputStream.map( new PairFunction() { @Override - public Tuple2 call(Integer i) throws Exception { + public Tuple2 call(Integer i) { return new Tuple2(i % 10, 1); } }); JavaPairDStream reducedStream = mappedStream.reduceByKey( new Function2() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); -- cgit v1.2.3 From 4b92a20232bc24fd858ed4eb7c45462241e36829 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 7 Jan 2014 09:38:45 +0000 Subject: Issue #318 : minor style updates per review from Reynold Xin --- examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java | 5 +---- examples/src/main/java/org/apache/spark/examples/JavaKMeans.java | 3 --- examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java | 3 --- examples/src/main/java/org/apache/spark/examples/JavaPageRank.java | 4 ---- examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java | 3 --- examples/src/main/java/org/apache/spark/examples/JavaTC.java | 3 --- examples/src/main/java/org/apache/spark/examples/JavaWordCount.java | 3 --- examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java | 5 +---- .../src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java | 3 --- examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java | 3 --- 10 files changed, 2 insertions(+), 33 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index 71bd3b4821..d552c47b22 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -35,9 +35,6 @@ public final class JavaHdfsLR { private static final int D = 10; // Number of dimensions private static final Random rand = new Random(42); - private JavaHdfsLR() { - } - static class DataPoint implements Serializable { DataPoint(double[] x, double y) { this.x = x; @@ -57,7 +54,7 @@ public final class JavaHdfsLR { double y = Double.parseDouble(tok[0]); double[] x = new double[D]; for (int i = 0; i < D; i++) { - x[i] = Double.parseDouble(tok[i+1]); + x[i] = Double.parseDouble(tok[i + 1]); } return new DataPoint(x, y); } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java index 0808f33e6a..0dc879275a 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java @@ -36,9 +36,6 @@ public final class JavaKMeans { private static final Pattern SPACE = Pattern.compile(" "); - private JavaKMeans() { - } - /** Parses numbers split by whitespace to a vector */ static Vector parseVector(String line) { String[] splits = SPACE.split(line); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index d45d96d804..9eb1cadd71 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -54,9 +54,6 @@ public final class JavaLogQuery { public static final Pattern apacheLogRegex = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); - private JavaLogQuery() { - } - /** Tracks the total query count and number of aggregate bytes for a particular group. */ public static class Stats implements Serializable { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index 12d2cce1a7..a84245b0c7 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -17,7 +17,6 @@ package org.apache.spark.examples; -import org.apache.spark.SparkContext; import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -43,9 +42,6 @@ import java.util.regex.Pattern; public final class JavaPageRank { private static final Pattern SPACES = Pattern.compile("\\s+"); - private JavaPageRank() { - } - private static class Sum extends Function2 { @Override public Double call(Double a, Double b) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index f6ed510e05..3ec4a58d48 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -28,9 +28,6 @@ import java.util.List; /** Computes an approximation to pi */ public final class JavaSparkPi { - private JavaSparkPi() { - } - public static void main(String[] args) throws Exception { if (args.length == 0) { System.err.println("Usage: JavaLogQuery [slices]"); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index 12b564d1ef..2ceb0fd94b 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -37,9 +37,6 @@ public final class JavaTC { private static final int numVertices = 100; private static final Random rand = new Random(42); - private JavaTC() { - } - static List> generateGraph() { Set> edges = new HashSet>(numEdges); while (edges.size() < numEdges) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index fc9beb8fe5..6651f98d56 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -32,9 +32,6 @@ import java.util.regex.Pattern; public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" "); - private JavaWordCount() { - } - public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaWordCount "); diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java index c42d9cb788..435a86e62a 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java @@ -33,10 +33,7 @@ import scala.Tuple2; /** * Example using MLLib ALS from Java. */ -public final class JavaALS { - - private JavaALS() { - } +public final class JavaALS { static class ParseRating extends Function { private static final Pattern COMMA = Pattern.compile(","); diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java index 9d10473aed..4b2658f257 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java @@ -32,9 +32,6 @@ import java.util.regex.Pattern; */ public final class JavaKMeans { - private JavaKMeans() { - } - static class ParsePoint extends Function { private static final Pattern SPACE = Pattern.compile(" "); diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java index b057f71e08..21586ce817 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java @@ -34,9 +34,6 @@ import java.util.regex.Pattern; */ public final class JavaLR { - private JavaLR() { - } - static class ParsePoint extends Function { private static final Pattern COMMA = Pattern.compile(","); private static final Pattern SPACE = Pattern.compile(" "); -- cgit v1.2.3