diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-10 18:06:23 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-10 18:06:23 -0700 |
commit | 06e4f2a8f20e1af837b0e0a520979b81fffe8042 (patch) | |
tree | d2f46326dd59f5ae943d339fe686d01302fcae7f /examples | |
parent | 71c63de22fea19fcd3f6a847dfd3d7b6ab597eac (diff) | |
parent | c4eea875ac5d02b46b22b454532c9702c3fa6240 (diff) | |
download | spark-06e4f2a8f20e1af837b0e0a520979b81fffe8042.tar.gz spark-06e4f2a8f20e1af837b0e0a520979b81fffe8042.tar.bz2 spark-06e4f2a8f20e1af837b0e0a520979b81fffe8042.zip |
Merge pull request #789 from MLnick/master
Adding Scala version of PageRank example
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/examples/SparkPageRank.scala | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/SparkPageRank.scala b/examples/src/main/scala/spark/examples/SparkPageRank.scala new file mode 100644 index 0000000000..4e41c026a4 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkPageRank.scala @@ -0,0 +1,50 @@ +package spark.examples + +import spark.SparkContext._ +import spark.SparkContext + + +/** + * Computes the PageRank of URLs from an input file. Input file should + * be in format of: + * URL neighbor URL + * URL neighbor URL + * URL neighbor URL + * ... + * where URL and their neighbors are separated by space(s). + */ +object SparkPageRank { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: PageRank <master> <file> <number_of_iterations>") + System.exit(1) + } + var iters = args(2).toInt + val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val lines = ctx.textFile(args(1), 1) + val links = lines.map{s => + val parts = s.split("\\s+") + (parts(0), parts(1)) + }.distinct().groupByKey().cache() + var ranks = links.mapValues(v => 1.0) + + for (i <- 1 to iters) { + val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => + val size = urls.size + urls.map(url => (url, rank / size)) + } + + ranks = contribs.groupByKey().mapValues{ranks => + val sumRanks = ranks.foldLeft(0.0)(_ + _) + 0.15 + sumRanks * 0.85 + } + } + + val output = ranks.collect() + output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + ".")) + + ctx.stop() + System.exit(0) + } +} + |