aboutsummaryrefslogtreecommitdiff
path: root/pyspark/examples/transitive_closure.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyspark/examples/transitive_closure.py')
-rw-r--r--pyspark/examples/transitive_closure.py50
1 files changed, 0 insertions, 50 deletions
diff --git a/pyspark/examples/transitive_closure.py b/pyspark/examples/transitive_closure.py
deleted file mode 100644
index 73f7f8fbaf..0000000000
--- a/pyspark/examples/transitive_closure.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import sys
-from random import Random
-
-from pyspark import SparkContext
-
-numEdges = 200
-numVertices = 100
-rand = Random(42)
-
-
-def generateGraph():
- edges = set()
- while len(edges) < numEdges:
- src = rand.randrange(0, numEdges)
- dst = rand.randrange(0, numEdges)
- if src != dst:
- edges.add((src, dst))
- return edges
-
-
-if __name__ == "__main__":
- if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonTC <master> [<slices>]"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonTC")
- slices = sys.argv[2] if len(sys.argv) > 2 else 2
- tc = sc.parallelize(generateGraph(), slices).cache()
-
- # Linear transitive closure: each round grows paths by one edge,
- # by joining the graph's edges with the already-discovered paths.
- # e.g. join the path (y, z) from the TC with the edge (x, y) from
- # the graph to obtain the path (x, z).
-
- # Because join() joins on keys, the edges are stored in reversed order.
- edges = tc.map(lambda (x, y): (y, x))
-
- oldCount = 0L
- nextCount = tc.count()
- while True:
- oldCount = nextCount
- # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
- # then project the result to obtain the new (x, z) paths.
- new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
- tc = tc.union(new_edges).distinct().cache()
- nextCount = tc.count()
- if nextCount == oldCount:
- break
-
- print "TC has %i edges" % tc.count()