diff options
author | stayhf <chutong88@gmail.com> | 2013-08-05 00:30:28 +0000 |
---|---|---|
committer | stayhf <chutong88@gmail.com> | 2013-08-05 00:30:28 +0000 |
commit | 98fd62605daaf4e74dfffef1e7bc57ac131a61bb (patch) | |
tree | 9cb1f5c51ae8fb33a1796ca8ac5765cb5d5c91f9 /examples | |
parent | a6826373016f415169316024373c604089d2be31 (diff) | |
download | spark-98fd62605daaf4e74dfffef1e7bc57ac131a61bb.tar.gz spark-98fd62605daaf4e74dfffef1e7bc57ac131a61bb.tar.bz2 spark-98fd62605daaf4e74dfffef1e7bc57ac131a61bb.zip |
Updated code with reviewer's suggestions
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/java/spark/examples/JavaPageRank.java | 94 |
1 files changed, 47 insertions, 47 deletions
diff --git a/examples/src/main/java/spark/examples/JavaPageRank.java b/examples/src/main/java/spark/examples/JavaPageRank.java index c491c86a6f..b57797a9fb 100644 --- a/examples/src/main/java/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/spark/examples/JavaPageRank.java @@ -61,47 +61,47 @@ public class JavaPageRank { System.exit(1); } - JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank", - System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); - - // Loads in input file. It should be in format of: - // URL neighbor URL - // URL neighbor URL - // URL neighbor URL - // ... - JavaRDD<String> lines = ctx.textFile(args[1], 1).cache(); - - // Loads all URLs from input file and initialize their neighbors. - JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() { - @Override - public Tuple2<String, String> call(String s) { - return new Tuple2<String, String>(s.split("\\s+")[0], s.split("\\s+")[1]); - } - }).distinct().groupByKey(); - - // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. - JavaPairRDD<String, Double> ranks = lines.map(new PairFunction<String, String, Double>() { - @Override - public Tuple2<String, Double> call(String s) { - return new Tuple2<String, Double>(s.split("\\s+")[1], 1.0); - } - }).distinct(); - + JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); + + // Loads in input file. It should be in format of: + // URL neighbor URL + // URL neighbor URL + // URL neighbor URL + // ... + JavaRDD<String> lines = ctx.textFile(args[1], 1); + + // Loads all URLs from input file and initialize their neighbors. + JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() { + @Override + public Tuple2<String, String> call(String s) { + String[] parts = s.split("\\s+"); + return new Tuple2<String, String>(parts[0], parts[1]); + } + }).distinct().groupByKey().cache(); - // Calculates and updates URL ranks continuously using PageRank algorithm. - for (int current = 0; current < Integer.parseInt(args[2]); current++) { - // Calculates URL contributions to the rank of other URLs. - JavaPairRDD<String, Double> contribs = links.join(ranks).values() - .flatMap(new FlatMapFunction<Tuple2<List<String>, Double>, Tuple2<String, Double>>() { - @Override - public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) { - List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>(); - for (String n : s._1) { - results.add(new Tuple2<String, Double>(n, s._2 / s._1.size())); + // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. + JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() { + @Override + public Double call(List<String> rs) throws Exception { + return 1.0; + } + }); + + // Calculates and updates URL ranks continuously using PageRank algorithm. + for (int current = 0; current < Integer.parseInt(args[2]); current++) { + // Calculates URL contributions to the rank of other URLs. + JavaPairRDD<String, Double> contribs = links.join(ranks).values() + .flatMap(new FlatMapFunction<Tuple2<List<String>, Double>, Tuple2<String, Double>>() { + @Override + public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) { + List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>(); + for (String n : s._1) { + results.add(new Tuple2<String, Double>(n, s._2 / s._1.size())); + } + + return results; } - - return results; - } }).map(new PairFunction<Tuple2<String, Double>, String, Double>() { @Override public Tuple2<String, Double> call(Tuple2<String, Double> s) { @@ -115,15 +115,15 @@ public class JavaPageRank { public Double call(List<Double> cs) throws Exception { return 0.15 + sum(cs) * 0.85; } - }); - } + }); + } - // Collects all URL ranks and dump them to console. - List<Tuple2<String, Double>> output = ranks.collect(); - for (Tuple2 tuple : output) { - System.out.println(tuple._1 + " has rank: " + round((Double)(tuple._2), 2) + "."); - } + // Collects all URL ranks and dump them to console. + List<Tuple2<String, Double>> output = ranks.collect(); + for (Tuple2 tuple : output) { + System.out.println(tuple._1 + " has rank: " + round((Double)(tuple._2), 2) + "."); + } - System.exit(0); + System.exit(0); } } |