diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/JavaPageRank.java | 21 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala | 14 |
2 files changed, 22 insertions, 13 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index eb70fb5475..8513ba07e7 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -17,7 +17,10 @@ package org.apache.spark.examples; + import scala.Tuple2; + +import com.google.common.collect.Iterables; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -26,8 +29,9 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; -import java.util.List; import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; import java.util.regex.Pattern; /** @@ -66,7 +70,7 @@ public final class JavaPageRank { JavaRDD<String> lines = ctx.textFile(args[1], 1); // Loads all URLs from input file and initialize their neighbors. - JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() { + JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) { String[] parts = SPACES.split(s); @@ -75,9 +79,9 @@ public final class JavaPageRank { }).distinct().groupByKey().cache(); // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. - JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() { + JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() { @Override - public Double call(List<String> rs) { + public Double call(Iterable<String> rs) { return 1.0; } }); @@ -86,12 +90,13 @@ public final class JavaPageRank { for (int current = 0; current < Integer.parseInt(args[2]); current++) { // Calculates URL contributions to the rank of other URLs. JavaPairRDD<String, Double> contribs = links.join(ranks).values() - .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() { + .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { @Override - public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) { + public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) { + int urlCount = Iterables.size(s._1); List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>(); - for (String n : s._1()) { - results.add(new Tuple2<String, Double>(n, s._2() / s._1().size())); + for (String n : s._1) { + results.add(new Tuple2<String, Double>(n, s._2() / urlCount)); } return results; } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index 27afa6b642..7aac6a1359 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -115,12 +115,16 @@ object WikipediaPageRankStandalone { var ranks = links.mapValues { edges => defaultRank } for (i <- 1 to numIterations) { val contribs = links.groupWith(ranks).flatMap { - case (id, (linksWrapper, rankWrapper)) => - if (linksWrapper.length > 0) { - if (rankWrapper.length > 0) { - linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size)) + case (id, (linksWrapperIterable, rankWrapperIterable)) => + val linksWrapper = linksWrapperIterable.iterator + val rankWrapper = rankWrapperIterable.iterator + if (linksWrapper.hasNext) { + val linksWrapperHead = linksWrapper.next + if (rankWrapper.hasNext) { + val rankWrapperHead = rankWrapper.next + linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size)) } else { - linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size)) + linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size)) } } else { Array[(String, Double)]() |