aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala14
1 files changed, 9 insertions, 5 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 27afa6b642..7aac6a1359 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -115,12 +115,16 @@ object WikipediaPageRankStandalone {
var ranks = links.mapValues { edges => defaultRank }
for (i <- 1 to numIterations) {
val contribs = links.groupWith(ranks).flatMap {
- case (id, (linksWrapper, rankWrapper)) =>
- if (linksWrapper.length > 0) {
- if (rankWrapper.length > 0) {
- linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
+ case (id, (linksWrapperIterable, rankWrapperIterable)) =>
+ val linksWrapper = linksWrapperIterable.iterator
+ val rankWrapper = rankWrapperIterable.iterator
+ if (linksWrapper.hasNext) {
+ val linksWrapperHead = linksWrapper.next
+ if (rankWrapper.hasNext) {
+ val rankWrapperHead = rankWrapper.next
+ linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size))
} else {
- linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
+ linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size))
}
} else {
Array[(String, Double)]()