aboutsummaryrefslogtreecommitdiff
path: root/python/examples
diff options
context:
space:
mode:
Diffstat (limited to 'python/examples')
-rwxr-xr-xpython/examples/als.py5
-rwxr-xr-x[-rw-r--r--]python/examples/kmeans.py3
-rwxr-xr-xpython/examples/logistic_regression.py54
-rwxr-xr-x[-rw-r--r--]python/examples/pi.py3
-rwxr-xr-x[-rw-r--r--]python/examples/transitive_closure.py5
-rwxr-xr-x[-rw-r--r--]python/examples/wordcount.py3
6 files changed, 33 insertions, 40 deletions
diff --git a/python/examples/als.py b/python/examples/als.py
index f2b2eee64c..a77dfb2577 100755
--- a/python/examples/als.py
+++ b/python/examples/als.py
@@ -48,8 +48,7 @@ def update(i, vec, mat, ratings):
if __name__ == "__main__":
if len(sys.argv) < 2:
- print >> sys.stderr, \
- "Usage: PythonALS <master> <M> <U> <F> <iters> <slices>"
+ print >> sys.stderr, "Usage: als <master> <M> <U> <F> <iters> <slices>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)])
M = int(sys.argv[2]) if len(sys.argv) > 2 else 100
@@ -84,5 +83,5 @@ if __name__ == "__main__":
usb = sc.broadcast(us)
error = rmse(R, ms, us)
- print "Iteration %d:" % i
+ print "Iteration %d:" % i
print "\nRMSE: %5.4f\n" % error
diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py
index c670556f2b..ba31af92fc 100644..100755
--- a/python/examples/kmeans.py
+++ b/python/examples/kmeans.py
@@ -41,8 +41,7 @@ def closestPoint(p, centers):
if __name__ == "__main__":
if len(sys.argv) < 5:
- print >> sys.stderr, \
- "Usage: PythonKMeans <master> <file> <k> <convergeDist>"
+ print >> sys.stderr, "Usage: kmeans <master> <file> <k> <convergeDist>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonKMeans")
lines = sc.textFile(sys.argv[2])
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
index 54d227d0d3..1117dea538 100755
--- a/python/examples/logistic_regression.py
+++ b/python/examples/logistic_regression.py
@@ -16,7 +16,8 @@
#
"""
-This example requires numpy (http://www.numpy.org/)
+A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches
+of input data using efficient matrix operations.
"""
from collections import namedtuple
from math import exp
@@ -27,48 +28,45 @@ import numpy as np
from pyspark import SparkContext
-N = 100000 # Number of data points
D = 10 # Number of dimensions
-R = 0.7 # Scaling factor
-ITERATIONS = 5
-np.random.seed(42)
-DataPoint = namedtuple("DataPoint", ['x', 'y'])
-from lr import DataPoint # So that DataPoint is properly serialized
-
-
-def generateData():
- def generatePoint(i):
- y = -1 if i % 2 == 0 else 1
- x = np.random.normal(size=D) + (y * R)
- return DataPoint(x, y)
- return [generatePoint(i) for i in range(N)]
-
+# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
+# make further computations faster.
+# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
+# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
+def readPointBatch(iterator):
+ strs = list(iterator)
+ matrix = np.zeros((len(strs), D + 1))
+ for i in xrange(len(strs)):
+ matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
+ return [matrix]
if __name__ == "__main__":
- if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonLR <master> [<slices>]"
+ if len(sys.argv) != 4:
+ print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
- slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
- points = sc.parallelize(generateData(), slices).cache()
+ points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache()
+ iterations = int(sys.argv[3])
# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
print "Initial w: " + str(w)
+ # Compute logistic regression gradient for a matrix of data points
+ def gradient(matrix, w):
+ Y = matrix[:,0] # point labels (first column of input file)
+ X = matrix[:,1:] # point coordinates
+ # For each point (x, y), compute gradient function, then sum these up
+ return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
+
def add(x, y):
x += y
return x
- for i in range(1, ITERATIONS + 1):
- print "On iteration %i" % i
-
- gradient = points.map(lambda p:
- (1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x
- ).reduce(add)
- w -= gradient
+ for i in range(iterations):
+ print "On iteration %i" % (i + 1)
+ w -= points.map(lambda m: gradient(m, w)).reduce(add)
print "Final w: " + str(w)
diff --git a/python/examples/pi.py b/python/examples/pi.py
index 33c026e824..ab0645fc2f 100644..100755
--- a/python/examples/pi.py
+++ b/python/examples/pi.py
@@ -24,8 +24,7 @@ from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonPi <master> [<slices>]"
+ print >> sys.stderr, "Usage: pi <master> [<slices>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonPi")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
index 40be3b5000..744cce6651 100644..100755
--- a/python/examples/transitive_closure.py
+++ b/python/examples/transitive_closure.py
@@ -37,10 +37,9 @@ def generateGraph():
if __name__ == "__main__":
if len(sys.argv) == 1:
- print >> sys.stderr, \
- "Usage: PythonTC <master> [<slices>]"
+ print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]"
exit(-1)
- sc = SparkContext(sys.argv[1], "PythonTC")
+ sc = SparkContext(sys.argv[1], "PythonTransitiveClosure")
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
tc = sc.parallelize(generateGraph(), slices).cache()
diff --git a/python/examples/wordcount.py b/python/examples/wordcount.py
index 41c846ba79..a6de22766a 100644..100755
--- a/python/examples/wordcount.py
+++ b/python/examples/wordcount.py
@@ -23,8 +23,7 @@ from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) < 3:
- print >> sys.stderr, \
- "Usage: PythonWordCount <master> <file>"
+ print >> sys.stderr, "Usage: wordcount <master> <file>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonWordCount")
lines = sc.textFile(sys.argv[2], 1)