From 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 1 Jan 2013 13:52:14 -0800 Subject: Minor documentation and style fixes for PySpark. --- pyspark/examples/kmeans.py | 13 +++++--- pyspark/examples/logistic_regression.py | 57 +++++++++++++++++++++++++++++++++ pyspark/examples/lr.py | 57 --------------------------------- pyspark/examples/pi.py | 5 +-- pyspark/examples/tc.py | 49 ---------------------------- pyspark/examples/transitive_closure.py | 50 +++++++++++++++++++++++++++++ pyspark/examples/wordcount.py | 4 ++- pyspark/pyspark/__init__.py | 13 +++++++- 8 files changed, 133 insertions(+), 115 deletions(-) create mode 100755 pyspark/examples/logistic_regression.py delete mode 100755 pyspark/examples/lr.py delete mode 100644 pyspark/examples/tc.py create mode 100644 pyspark/examples/transitive_closure.py (limited to 'pyspark') diff --git a/pyspark/examples/kmeans.py b/pyspark/examples/kmeans.py index 9cc366f03c..ad2be21178 100644 --- a/pyspark/examples/kmeans.py +++ b/pyspark/examples/kmeans.py @@ -1,18 +1,21 @@ +""" +This example requires numpy (http://www.numpy.org/) +""" import sys -from pyspark.context import SparkContext -from numpy import array, sum as np_sum +import numpy as np +from pyspark import SparkContext def parseVector(line): - return array([float(x) for x in line.split(' ')]) + return np.array([float(x) for x in line.split(' ')]) def closestPoint(p, centers): bestIndex = 0 closest = float("+inf") for i in range(len(centers)): - tempDist = np_sum((p - centers[i]) ** 2) + tempDist = np.sum((p - centers[i]) ** 2) if tempDist < closest: closest = tempDist bestIndex = i @@ -41,7 +44,7 @@ if __name__ == "__main__": newPoints = pointStats.map( lambda (x, (y, z)): (x, y / z)).collect() - tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) + tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) for (x, y) in newPoints: kPoints[x] = y diff --git a/pyspark/examples/logistic_regression.py b/pyspark/examples/logistic_regression.py new file mode 100755 index 0000000000..f13698a86f --- /dev/null +++ b/pyspark/examples/logistic_regression.py @@ -0,0 +1,57 @@ +""" +This example requires numpy (http://www.numpy.org/) +""" +from collections import namedtuple +from math import exp +from os.path import realpath +import sys + +import numpy as np +from pyspark import SparkContext + + +N = 100000 # Number of data points +D = 10 # Number of dimensions +R = 0.7 # Scaling factor +ITERATIONS = 5 +np.random.seed(42) + + +DataPoint = namedtuple("DataPoint", ['x', 'y']) +from lr import DataPoint # So that DataPoint is properly serialized + + +def generateData(): + def generatePoint(i): + y = -1 if i % 2 == 0 else 1 + x = np.random.normal(size=D) + (y * R) + return DataPoint(x, y) + return [generatePoint(i) for i in range(N)] + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, \ + "Usage: PythonLR []" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) + slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 + points = sc.parallelize(generateData(), slices).cache() + + # Initialize w to a random value + w = 2 * np.random.ranf(size=D) - 1 + print "Initial w: " + str(w) + + def add(x, y): + x += y + return x + + for i in range(1, ITERATIONS + 1): + print "On iteration %i" % i + + gradient = points.map(lambda p: + (1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x + ).reduce(add) + w -= gradient + + print "Final w: " + str(w) diff --git a/pyspark/examples/lr.py b/pyspark/examples/lr.py deleted file mode 100755 index 5fca0266b8..0000000000 --- a/pyspark/examples/lr.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -This example requires numpy (http://www.numpy.org/) -""" -from collections import namedtuple -from math import exp -from os.path import realpath -import sys - -import numpy as np -from pyspark.context import SparkContext - - -N = 100000 # Number of data points -D = 10 # Number of dimensions -R = 0.7 # Scaling factor -ITERATIONS = 5 -np.random.seed(42) - - -DataPoint = namedtuple("DataPoint", ['x', 'y']) -from lr import DataPoint # So that DataPoint is properly serialized - - -def generateData(): - def generatePoint(i): - y = -1 if i % 2 == 0 else 1 - x = np.random.normal(size=D) + (y * R) - return DataPoint(x, y) - return [generatePoint(i) for i in range(N)] - - -if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, \ - "Usage: PythonLR []" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) - slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 - points = sc.parallelize(generateData(), slices).cache() - - # Initialize w to a random value - w = 2 * np.random.ranf(size=D) - 1 - print "Initial w: " + str(w) - - def add(x, y): - x += y - return x - - for i in range(1, ITERATIONS + 1): - print "On iteration %i" % i - - gradient = points.map(lambda p: - (1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x - ).reduce(add) - w -= gradient - - print "Final w: " + str(w) diff --git a/pyspark/examples/pi.py b/pyspark/examples/pi.py index 348bbc5dce..127cba029b 100644 --- a/pyspark/examples/pi.py +++ b/pyspark/examples/pi.py @@ -1,13 +1,14 @@ import sys from random import random from operator import add -from pyspark.context import SparkContext + +from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) == 1: print >> sys.stderr, \ - "Usage: PythonPi []" + "Usage: PythonPi []" exit(-1) sc = SparkContext(sys.argv[1], "PythonPi") slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 diff --git a/pyspark/examples/tc.py b/pyspark/examples/tc.py deleted file mode 100644 index 9630e72b47..0000000000 --- a/pyspark/examples/tc.py +++ /dev/null @@ -1,49 +0,0 @@ -import sys -from random import Random -from pyspark.context import SparkContext - -numEdges = 200 -numVertices = 100 -rand = Random(42) - - -def generateGraph(): - edges = set() - while len(edges) < numEdges: - src = rand.randrange(0, numEdges) - dst = rand.randrange(0, numEdges) - if src != dst: - edges.add((src, dst)) - return edges - - -if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, \ - "Usage: PythonTC []" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonTC") - slices = sys.argv[2] if len(sys.argv) > 2 else 2 - tc = sc.parallelize(generateGraph(), slices).cache() - - # Linear transitive closure: each round grows paths by one edge, - # by joining the graph's edges with the already-discovered paths. - # e.g. join the path (y, z) from the TC with the edge (x, y) from - # the graph to obtain the path (x, z). - - # Because join() joins on keys, the edges are stored in reversed order. - edges = tc.map(lambda (x, y): (y, x)) - - oldCount = 0L - nextCount = tc.count() - while True: - oldCount = nextCount - # Perform the join, obtaining an RDD of (y, (z, x)) pairs, - # then project the result to obtain the new (x, z) paths. - new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) - tc = tc.union(new_edges).distinct().cache() - nextCount = tc.count() - if nextCount == oldCount: - break - - print "TC has %i edges" % tc.count() diff --git a/pyspark/examples/transitive_closure.py b/pyspark/examples/transitive_closure.py new file mode 100644 index 0000000000..73f7f8fbaf --- /dev/null +++ b/pyspark/examples/transitive_closure.py @@ -0,0 +1,50 @@ +import sys +from random import Random + +from pyspark import SparkContext + +numEdges = 200 +numVertices = 100 +rand = Random(42) + + +def generateGraph(): + edges = set() + while len(edges) < numEdges: + src = rand.randrange(0, numEdges) + dst = rand.randrange(0, numEdges) + if src != dst: + edges.add((src, dst)) + return edges + + +if __name__ == "__main__": + if len(sys.argv) == 1: + print >> sys.stderr, \ + "Usage: PythonTC []" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonTC") + slices = sys.argv[2] if len(sys.argv) > 2 else 2 + tc = sc.parallelize(generateGraph(), slices).cache() + + # Linear transitive closure: each round grows paths by one edge, + # by joining the graph's edges with the already-discovered paths. + # e.g. join the path (y, z) from the TC with the edge (x, y) from + # the graph to obtain the path (x, z). + + # Because join() joins on keys, the edges are stored in reversed order. + edges = tc.map(lambda (x, y): (y, x)) + + oldCount = 0L + nextCount = tc.count() + while True: + oldCount = nextCount + # Perform the join, obtaining an RDD of (y, (z, x)) pairs, + # then project the result to obtain the new (x, z) paths. + new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) + tc = tc.union(new_edges).distinct().cache() + nextCount = tc.count() + if nextCount == oldCount: + break + + print "TC has %i edges" % tc.count() diff --git a/pyspark/examples/wordcount.py b/pyspark/examples/wordcount.py index 8365c070e8..857160624b 100644 --- a/pyspark/examples/wordcount.py +++ b/pyspark/examples/wordcount.py @@ -1,6 +1,8 @@ import sys from operator import add -from pyspark.context import SparkContext + +from pyspark import SparkContext + if __name__ == "__main__": if len(sys.argv) < 3: diff --git a/pyspark/pyspark/__init__.py b/pyspark/pyspark/__init__.py index 8f8402b62b..1ab360a666 100644 --- a/pyspark/pyspark/__init__.py +++ b/pyspark/pyspark/__init__.py @@ -1,9 +1,20 @@ +""" +PySpark is a Python API for Spark. + +Public classes: + + - L{SparkContext} + Main entry point for Spark functionality. + - L{RDD} + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. +""" import sys import os sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "pyspark/lib/py4j0.7.egg")) from pyspark.context import SparkContext +from pyspark.rdd import RDD -__all__ = ["SparkContext"] +__all__ = ["SparkContext", "RDD"] -- cgit v1.2.3