diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 14:48:45 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 15:05:00 -0800 |
commit | b58340dbd9a741331fc4c3829b08c093560056c2 (patch) | |
tree | 52b0e94c47892a8f884b2f80a59ccdb1a428b389 /python/examples/transitive_closure.py | |
parent | 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (diff) | |
download | spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.gz spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.bz2 spark-b58340dbd9a741331fc4c3829b08c093560056c2.zip |
Rename top-level 'pyspark' directory to 'python'
Diffstat (limited to 'python/examples/transitive_closure.py')
-rw-r--r-- | python/examples/transitive_closure.py | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py new file mode 100644 index 0000000000..73f7f8fbaf --- /dev/null +++ b/python/examples/transitive_closure.py @@ -0,0 +1,50 @@ +import sys +from random import Random + +from pyspark import SparkContext + +numEdges = 200 +numVertices = 100 +rand = Random(42) + + +def generateGraph(): + edges = set() + while len(edges) < numEdges: + src = rand.randrange(0, numEdges) + dst = rand.randrange(0, numEdges) + if src != dst: + edges.add((src, dst)) + return edges + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, \ + "Usage: PythonTC <master> [<slices>]" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonTC") + slices = sys.argv[2] if len(sys.argv) > 2 else 2 + tc = sc.parallelize(generateGraph(), slices).cache() + + # Linear transitive closure: each round grows paths by one edge, + # by joining the graph's edges with the already-discovered paths. + # e.g. join the path (y, z) from the TC with the edge (x, y) from + # the graph to obtain the path (x, z). + + # Because join() joins on keys, the edges are stored in reversed order. + edges = tc.map(lambda (x, y): (y, x)) + + oldCount = 0L + nextCount = tc.count() + while True: + oldCount = nextCount + # Perform the join, obtaining an RDD of (y, (z, x)) pairs, + # then project the result to obtain the new (x, z) paths. + new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) + tc = tc.union(new_edges).distinct().cache() + nextCount = tc.count() + if nextCount == oldCount: + break + + print "TC has %i edges" % tc.count() |