aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-10 18:09:54 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-08-10 18:09:54 -0700
commit4c4f769187e468466702256e1e66bbc0c6178ef9 (patch)
treed68d686087b51434151c4c155f5e5a7c9e3b5d8d /examples
parent06e4f2a8f20e1af837b0e0a520979b81fffe8042 (diff)
downloadspark-4c4f769187e468466702256e1e66bbc0c6178ef9.tar.gz
spark-4c4f769187e468466702256e1e66bbc0c6178ef9.tar.bz2
spark-4c4f769187e468466702256e1e66bbc0c6178ef9.zip
Optimize Scala PageRank to use reduceByKey
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/SparkPageRank.scala12
1 files changed, 4 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/examples/SparkPageRank.scala b/examples/src/main/scala/spark/examples/SparkPageRank.scala
index 4e41c026a4..dedbbd01a3 100644
--- a/examples/src/main/scala/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/spark/examples/SparkPageRank.scala
@@ -20,9 +20,10 @@ object SparkPageRank {
System.exit(1)
}
var iters = args(2).toInt
- val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val ctx = new SparkContext(args(0), "PageRank",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = ctx.textFile(args(1), 1)
- val links = lines.map{s =>
+ val links = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
@@ -33,17 +34,12 @@ object SparkPageRank {
val size = urls.size
urls.map(url => (url, rank / size))
}
-
- ranks = contribs.groupByKey().mapValues{ranks =>
- val sumRanks = ranks.foldLeft(0.0)(_ + _)
- 0.15 + sumRanks * 0.85
- }
+ ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
- ctx.stop()
System.exit(0)
}
}