aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-12 21:26:59 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-12 21:26:59 -0700
commite2fdac60da8cb9b0ff0191631bf7e37ad3a47c76 (patch)
treeaa2ad899e1156de1ebec57c32fa3689e3902d761 /python
parentd3525babeee713cf386e0c013f352ab767993623 (diff)
parent24f02082c7cb6c259b21495bed074af19ce1d374 (diff)
downloadspark-e2fdac60da8cb9b0ff0191631bf7e37ad3a47c76.tar.gz
spark-e2fdac60da8cb9b0ff0191631bf7e37ad3a47c76.tar.bz2
spark-e2fdac60da8cb9b0ff0191631bf7e37ad3a47c76.zip
Merge pull request #802 from stayhf/SPARK-760-Python
Simple PageRank algorithm implementation in Python for SPARK-760
Diffstat (limited to 'python')
-rwxr-xr-xpython/examples/pagerank.py70
1 files changed, 70 insertions, 0 deletions
diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py
new file mode 100755
index 0000000000..cd774cf3a3
--- /dev/null
+++ b/python/examples/pagerank.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#!/usr/bin/env python
+
+import re, sys
+from operator import add
+
+from pyspark import SparkContext
+
+
+def computeContribs(urls, rank):
+ """Calculates URL contributions to the rank of other URLs."""
+ num_urls = len(urls)
+ for url in urls: yield (url, rank / num_urls)
+
+
+def parseNeighbors(urls):
+ """Parses a urls pair string into urls pair."""
+ parts = re.split(r'\s+', urls)
+ return parts[0], parts[1]
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print >> sys.stderr, "Usage: pagerank <master> <file> <number_of_iterations>"
+ exit(-1)
+
+ # Initialize the spark context.
+ sc = SparkContext(sys.argv[1], "PythonPageRank")
+
+ # Loads in input file. It should be in format of:
+ # URL neighbor URL
+ # URL neighbor URL
+ # URL neighbor URL
+ # ...
+ lines = sc.textFile(sys.argv[2], 1)
+
+ # Loads all URLs from input file and initialize their neighbors.
+ links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
+
+ # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
+ ranks = links.map(lambda (url, neighbors): (url, 1.0))
+
+ # Calculates and updates URL ranks continuously using PageRank algorithm.
+ for iteration in xrange(int(sys.argv[3])):
+ # Calculates URL contributions to the rank of other URLs.
+ contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
+ computeContribs(urls, rank))
+
+ # Re-calculates URL ranks based on neighbor contributions.
+ ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
+
+ # Collects all URL ranks and dump them to console.
+ for (link, rank) in ranks.collect():
+ print "%s has rank: %s." % (link, rank)