aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java21
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala14
2 files changed, 22 insertions, 13 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index eb70fb5475..8513ba07e7 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
package org.apache.spark.examples;
+
import scala.Tuple2;
+
+import com.google.common.collect.Iterables;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -26,8 +29,9 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import java.util.List;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
import java.util.regex.Pattern;
/**
@@ -66,7 +70,7 @@ public final class JavaPageRank {
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// Loads all URLs from input file and initialize their neighbors.
- JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
+ JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
@@ -75,9 +79,9 @@ public final class JavaPageRank {
}).distinct().groupByKey().cache();
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
+ JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
@Override
- public Double call(List<String> rs) {
+ public Double call(Iterable<String> rs) {
return 1.0;
}
});
@@ -86,12 +90,13 @@ public final class JavaPageRank {
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
- .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+ .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
+ public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+ int urlCount = Iterables.size(s._1);
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
- for (String n : s._1()) {
- results.add(new Tuple2<String, Double>(n, s._2() / s._1().size()));
+ for (String n : s._1) {
+ results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
}
return results;
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 27afa6b642..7aac6a1359 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -115,12 +115,16 @@ object WikipediaPageRankStandalone {
var ranks = links.mapValues { edges => defaultRank }
for (i <- 1 to numIterations) {
val contribs = links.groupWith(ranks).flatMap {
- case (id, (linksWrapper, rankWrapper)) =>
- if (linksWrapper.length > 0) {
- if (rankWrapper.length > 0) {
- linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
+ case (id, (linksWrapperIterable, rankWrapperIterable)) =>
+ val linksWrapper = linksWrapperIterable.iterator
+ val rankWrapper = rankWrapperIterable.iterator
+ if (linksWrapper.hasNext) {
+ val linksWrapperHead = linksWrapper.next
+ if (rankWrapper.hasNext) {
+ val rankWrapperHead = rankWrapper.next
+ linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size))
} else {
- linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
+ linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size))
}
} else {
Array[(String, Double)]()