From 55d9bde2faddee7205acec239203ccdd482fcf6d Mon Sep 17 00:00:00 2001 From: stayhf Date: Sat, 10 Aug 2013 23:48:51 +0000 Subject: Simple PageRank algorithm implementation in Python for SPARK-760 --- python/examples/pagerank.py | 68 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100755 python/examples/pagerank.py (limited to 'python') diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py new file mode 100755 index 0000000000..eff93281ea --- /dev/null +++ b/python/examples/pagerank.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#!/usr/bin/env python + +import re, sys +from operator import add + +from pyspark import SparkContext + + +def _cal_contribs(urls, rank): + """Calculates URL contributions to the rank of other URLs.""" + num_urls = len(urls) + return [[url, rank / num_urls] for url in urls] + + +def _parse_neighbors(urls): + """Parses a urls pair string into urls pair.""" + parts = re.split(r'\s+', urls) + return parts[0], parts[1] + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print >> sys.stderr, "Usage: pagerank " + exit(-1) + + # Initialize the spark context. + sc = SparkContext(sys.argv[1], "PythonPageRank") + + # Loads in input file. It should be in format of: + # URL neighbor URL + # URL neighbor URL + # URL neighbor URL + # ... + lines = sc.textFile(sys.argv[2], 1) + + # Loads all URLs from input file and initialize their neighbors. + links = lines.map(lambda urls: _parse_neighbors(urls)).distinct().groupByKey().cache() + + # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. + ranks = links.map(lambda (url, neighbors): (url, 1.0)) + + # Calculates and updates URL ranks continuously using PageRank algorithm. + for iteration in xrange(int(sys.argv[3])): + # Calculates URL contributions to the rank of other URLs. + contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): _cal_contribs(urls, rank)) + + # Re-calculates URL ranks based on neighbor contributions. + ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15)) + + # Collects all URL ranks and dump them to console. + for (link, rank) in ranks.collect(): print "%s has rank: %s." % (link, rank) -- cgit v1.2.3 From 24f02082c7cb6c259b21495bed074af19ce1d374 Mon Sep 17 00:00:00 2001 From: stayhf Date: Sun, 11 Aug 2013 22:54:05 +0000 Subject: Code update for Matei's suggestions --- python/examples/pagerank.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'python') diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py index eff93281ea..cd774cf3a3 100755 --- a/python/examples/pagerank.py +++ b/python/examples/pagerank.py @@ -23,13 +23,13 @@ from operator import add from pyspark import SparkContext -def _cal_contribs(urls, rank): +def computeContribs(urls, rank): """Calculates URL contributions to the rank of other URLs.""" num_urls = len(urls) - return [[url, rank / num_urls] for url in urls] + for url in urls: yield (url, rank / num_urls) -def _parse_neighbors(urls): +def parseNeighbors(urls): """Parses a urls pair string into urls pair.""" parts = re.split(r'\s+', urls) return parts[0], parts[1] @@ -51,7 +51,7 @@ if __name__ == "__main__": lines = sc.textFile(sys.argv[2], 1) # Loads all URLs from input file and initialize their neighbors. - links = lines.map(lambda urls: _parse_neighbors(urls)).distinct().groupByKey().cache() + links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache() # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. ranks = links.map(lambda (url, neighbors): (url, 1.0)) @@ -59,10 +59,12 @@ if __name__ == "__main__": # Calculates and updates URL ranks continuously using PageRank algorithm. for iteration in xrange(int(sys.argv[3])): # Calculates URL contributions to the rank of other URLs. - contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): _cal_contribs(urls, rank)) + contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): + computeContribs(urls, rank)) # Re-calculates URL ranks based on neighbor contributions. - ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15)) + ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15) # Collects all URL ranks and dump them to console. - for (link, rank) in ranks.collect(): print "%s has rank: %s." % (link, rank) + for (link, rank) in ranks.collect(): + print "%s has rank: %s." % (link, rank) -- cgit v1.2.3