aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorstayhf <chutong88@gmail.com>2013-08-11 22:54:05 +0000
committerstayhf <chutong88@gmail.com>2013-08-11 22:54:05 +0000
commit24f02082c7cb6c259b21495bed074af19ce1d374 (patch)
tree5a9de6d96a8bb1754918980de6e58f58cfbdaf0e /python
parent55d9bde2faddee7205acec239203ccdd482fcf6d (diff)
downloadspark-24f02082c7cb6c259b21495bed074af19ce1d374.tar.gz
spark-24f02082c7cb6c259b21495bed074af19ce1d374.tar.bz2
spark-24f02082c7cb6c259b21495bed074af19ce1d374.zip
Code update for Matei's suggestions
Diffstat (limited to 'python')
-rwxr-xr-xpython/examples/pagerank.py16
1 files changed, 9 insertions, 7 deletions
diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py
index eff93281ea..cd774cf3a3 100755
--- a/python/examples/pagerank.py
+++ b/python/examples/pagerank.py
@@ -23,13 +23,13 @@ from operator import add
from pyspark import SparkContext
-def _cal_contribs(urls, rank):
+def computeContribs(urls, rank):
"""Calculates URL contributions to the rank of other URLs."""
num_urls = len(urls)
- return [[url, rank / num_urls] for url in urls]
+ for url in urls: yield (url, rank / num_urls)
-def _parse_neighbors(urls):
+def parseNeighbors(urls):
"""Parses a urls pair string into urls pair."""
parts = re.split(r'\s+', urls)
return parts[0], parts[1]
@@ -51,7 +51,7 @@ if __name__ == "__main__":
lines = sc.textFile(sys.argv[2], 1)
# Loads all URLs from input file and initialize their neighbors.
- links = lines.map(lambda urls: _parse_neighbors(urls)).distinct().groupByKey().cache()
+ links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda (url, neighbors): (url, 1.0))
@@ -59,10 +59,12 @@ if __name__ == "__main__":
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[3])):
# Calculates URL contributions to the rank of other URLs.
- contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): _cal_contribs(urls, rank))
+ contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
+ computeContribs(urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
- ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15))
+ ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
- for (link, rank) in ranks.collect(): print "%s has rank: %s." % (link, rank)
+ for (link, rank) in ranks.collect():
+ print "%s has rank: %s." % (link, rank)