aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-10 18:06:23 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-10 18:06:23 -0700
commit06e4f2a8f20e1af837b0e0a520979b81fffe8042 (patch)
treed2f46326dd59f5ae943d339fe686d01302fcae7f /examples
parent71c63de22fea19fcd3f6a847dfd3d7b6ab597eac (diff)
parentc4eea875ac5d02b46b22b454532c9702c3fa6240 (diff)
downloadspark-06e4f2a8f20e1af837b0e0a520979b81fffe8042.tar.gz
spark-06e4f2a8f20e1af837b0e0a520979b81fffe8042.tar.bz2
spark-06e4f2a8f20e1af837b0e0a520979b81fffe8042.zip
Merge pull request #789 from MLnick/master
Adding Scala version of PageRank example
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/SparkPageRank.scala50
1 files changed, 50 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/SparkPageRank.scala b/examples/src/main/scala/spark/examples/SparkPageRank.scala
new file mode 100644
index 0000000000..4e41c026a4
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkPageRank.scala
@@ -0,0 +1,50 @@
+package spark.examples
+
+import spark.SparkContext._
+import spark.SparkContext
+
+
+/**
+ * Computes the PageRank of URLs from an input file. Input file should
+ * be in format of:
+ * URL neighbor URL
+ * URL neighbor URL
+ * URL neighbor URL
+ * ...
+ * where URL and their neighbors are separated by space(s).
+ */
+object SparkPageRank {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
+ System.exit(1)
+ }
+ var iters = args(2).toInt
+ val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val lines = ctx.textFile(args(1), 1)
+ val links = lines.map{s =>
+ val parts = s.split("\\s+")
+ (parts(0), parts(1))
+ }.distinct().groupByKey().cache()
+ var ranks = links.mapValues(v => 1.0)
+
+ for (i <- 1 to iters) {
+ val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
+ val size = urls.size
+ urls.map(url => (url, rank / size))
+ }
+
+ ranks = contribs.groupByKey().mapValues{ranks =>
+ val sumRanks = ranks.foldLeft(0.0)(_ + _)
+ 0.15 + sumRanks * 0.85
+ }
+ }
+
+ val output = ranks.collect()
+ output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
+
+ ctx.stop()
+ System.exit(0)
+ }
+}
+