diff options
author | Sandeep <sandeep@techaddict.me> | 2014-05-06 17:27:52 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-05-06 17:27:52 -0700 |
commit | a000b5c3b0438c17e9973df4832c320210c29c27 (patch) | |
tree | 446bbe902ecd6de05072357f7ef25aaeb0687b73 /examples/src/main/python/transitive_closure.py | |
parent | 39b8b1489ff92697e4aeec997cdc436c7079d6f8 (diff) | |
download | spark-a000b5c3b0438c17e9973df4832c320210c29c27.tar.gz spark-a000b5c3b0438c17e9973df4832c320210c29c27.tar.bz2 spark-a000b5c3b0438c17e9973df4832c320210c29c27.zip |
SPARK-1637: Clean up examples for 1.0
- [x] Move all of them into subpackages of org.apache.spark.examples (right now some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
- [x] Move Python examples into examples/src/main/python
- [x] Update docs to reflect these changes
Author: Sandeep <sandeep@techaddict.me>
This patch had conflicts when merged, resolved by
Committer: Matei Zaharia <matei@databricks.com>
Closes #571 from techaddict/SPARK-1637 and squashes the following commits:
47ef86c [Sandeep] Changes based on Discussions on PR, removing use of RawTextHelper from examples
8ed2d3f [Sandeep] Docs Updated for changes, Change for java examples
5f96121 [Sandeep] Move Python examples into examples/src/main/python
0a8dd77 [Sandeep] Move all Scala Examples to org.apache.spark.examples (some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
Diffstat (limited to 'examples/src/main/python/transitive_closure.py')
-rwxr-xr-x | examples/src/main/python/transitive_closure.py | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py new file mode 100755 index 0000000000..744cce6651 --- /dev/null +++ b/examples/src/main/python/transitive_closure.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +from random import Random + +from pyspark import SparkContext + +numEdges = 200 +numVertices = 100 +rand = Random(42) + + +def generateGraph(): + edges = set() + while len(edges) < numEdges: + src = rand.randrange(0, numEdges) + dst = rand.randrange(0, numEdges) + if src != dst: + edges.add((src, dst)) + return edges + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonTransitiveClosure") + slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 + tc = sc.parallelize(generateGraph(), slices).cache() + + # Linear transitive closure: each round grows paths by one edge, + # by joining the graph's edges with the already-discovered paths. + # e.g. join the path (y, z) from the TC with the edge (x, y) from + # the graph to obtain the path (x, z). + + # Because join() joins on keys, the edges are stored in reversed order. + edges = tc.map(lambda (x, y): (y, x)) + + oldCount = 0L + nextCount = tc.count() + while True: + oldCount = nextCount + # Perform the join, obtaining an RDD of (y, (z, x)) pairs, + # then project the result to obtain the new (x, z) paths. + new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) + tc = tc.union(new_edges).distinct().cache() + nextCount = tc.count() + if nextCount == oldCount: + break + + print "TC has %i edges" % tc.count() |