diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 13:52:14 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 13:52:14 -0800 |
commit | 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (patch) | |
tree | da3df59e2262dac4b381227d5bc712502249d746 /pyspark/examples/transitive_closure.py | |
parent | 6f6a6b79c4c3f3555f8ff427c91e714d02afe8fa (diff) | |
download | spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.tar.gz spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.tar.bz2 spark-170e451fbdd308ae77065bd9c0f2bd278abf0cb7.zip |
Minor documentation and style fixes for PySpark.
Diffstat (limited to 'pyspark/examples/transitive_closure.py')
-rw-r--r-- | pyspark/examples/transitive_closure.py | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/pyspark/examples/transitive_closure.py b/pyspark/examples/transitive_closure.py new file mode 100644 index 0000000000..73f7f8fbaf --- /dev/null +++ b/pyspark/examples/transitive_closure.py @@ -0,0 +1,50 @@ +import sys +from random import Random + +from pyspark import SparkContext + +numEdges = 200 +numVertices = 100 +rand = Random(42) + + +def generateGraph(): + edges = set() + while len(edges) < numEdges: + src = rand.randrange(0, numEdges) + dst = rand.randrange(0, numEdges) + if src != dst: + edges.add((src, dst)) + return edges + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, \ + "Usage: PythonTC <master> [<slices>]" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonTC") + slices = sys.argv[2] if len(sys.argv) > 2 else 2 + tc = sc.parallelize(generateGraph(), slices).cache() + + # Linear transitive closure: each round grows paths by one edge, + # by joining the graph's edges with the already-discovered paths. + # e.g. join the path (y, z) from the TC with the edge (x, y) from + # the graph to obtain the path (x, z). + + # Because join() joins on keys, the edges are stored in reversed order. + edges = tc.map(lambda (x, y): (y, x)) + + oldCount = 0L + nextCount = tc.count() + while True: + oldCount = nextCount + # Perform the join, obtaining an RDD of (y, (z, x)) pairs, + # then project the result to obtain the new (x, z) paths. + new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) + tc = tc.union(new_edges).distinct().cache() + nextCount = tc.count() + if nextCount == oldCount: + break + + print "TC has %i edges" % tc.count() |