aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorstayhf <chutong88@gmail.com>2013-08-05 00:30:28 +0000
committerstayhf <chutong88@gmail.com>2013-08-05 00:30:28 +0000
commit98fd62605daaf4e74dfffef1e7bc57ac131a61bb (patch)
tree9cb1f5c51ae8fb33a1796ca8ac5765cb5d5c91f9 /examples/src
parenta6826373016f415169316024373c604089d2be31 (diff)
downloadspark-98fd62605daaf4e74dfffef1e7bc57ac131a61bb.tar.gz
spark-98fd62605daaf4e74dfffef1e7bc57ac131a61bb.tar.bz2
spark-98fd62605daaf4e74dfffef1e7bc57ac131a61bb.zip
Updated code with reviewer's suggestions
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/java/spark/examples/JavaPageRank.java94
1 files changed, 47 insertions, 47 deletions
diff --git a/examples/src/main/java/spark/examples/JavaPageRank.java b/examples/src/main/java/spark/examples/JavaPageRank.java
index c491c86a6f..b57797a9fb 100644
--- a/examples/src/main/java/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/spark/examples/JavaPageRank.java
@@ -61,47 +61,47 @@ public class JavaPageRank {
System.exit(1);
}
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- // Loads in input file. It should be in format of:
- // URL neighbor URL
- // URL neighbor URL
- // URL neighbor URL
- // ...
- JavaRDD<String> lines = ctx.textFile(args[1], 1).cache();
-
- // Loads all URLs from input file and initialize their neighbors.
- JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
- @Override
- public Tuple2<String, String> call(String s) {
- return new Tuple2<String, String>(s.split("\\s+")[0], s.split("\\s+")[1]);
- }
- }).distinct().groupByKey();
-
- // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- JavaPairRDD<String, Double> ranks = lines.map(new PairFunction<String, String, Double>() {
- @Override
- public Tuple2<String, Double> call(String s) {
- return new Tuple2<String, Double>(s.split("\\s+")[1], 1.0);
- }
- }).distinct();
-
+ JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ // Loads in input file. It should be in format of:
+ // URL neighbor URL
+ // URL neighbor URL
+ // URL neighbor URL
+ // ...
+ JavaRDD<String> lines = ctx.textFile(args[1], 1);
+
+ // Loads all URLs from input file and initialize their neighbors.
+ JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
+ @Override
+ public Tuple2<String, String> call(String s) {
+ String[] parts = s.split("\\s+");
+ return new Tuple2<String, String>(parts[0], parts[1]);
+ }
+ }).distinct().groupByKey().cache();
- // Calculates and updates URL ranks continuously using PageRank algorithm.
- for (int current = 0; current < Integer.parseInt(args[2]); current++) {
- // Calculates URL contributions to the rank of other URLs.
- JavaPairRDD<String, Double> contribs = links.join(ranks).values()
- .flatMap(new FlatMapFunction<Tuple2<List<String>, Double>, Tuple2<String, Double>>() {
- @Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
- List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
- for (String n : s._1) {
- results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
+ // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
+ JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
+ @Override
+ public Double call(List<String> rs) throws Exception {
+ return 1.0;
+ }
+ });
+
+ // Calculates and updates URL ranks continuously using PageRank algorithm.
+ for (int current = 0; current < Integer.parseInt(args[2]); current++) {
+ // Calculates URL contributions to the rank of other URLs.
+ JavaPairRDD<String, Double> contribs = links.join(ranks).values()
+ .flatMap(new FlatMapFunction<Tuple2<List<String>, Double>, Tuple2<String, Double>>() {
+ @Override
+ public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
+ List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+ for (String n : s._1) {
+ results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
+ }
+
+ return results;
}
-
- return results;
- }
}).map(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> s) {
@@ -115,15 +115,15 @@ public class JavaPageRank {
public Double call(List<Double> cs) throws Exception {
return 0.15 + sum(cs) * 0.85;
}
- });
- }
+ });
+ }
- // Collects all URL ranks and dump them to console.
- List<Tuple2<String, Double>> output = ranks.collect();
- for (Tuple2 tuple : output) {
- System.out.println(tuple._1 + " has rank: " + round((Double)(tuple._2), 2) + ".");
- }
+ // Collects all URL ranks and dump them to console.
+ List<Tuple2<String, Double>> output = ranks.collect();
+ for (Tuple2 tuple : output) {
+ System.out.println(tuple._1 + " has rank: " + round((Double)(tuple._2), 2) + ".");
+ }
- System.exit(0);
+ System.exit(0);
}
}