diff options
Diffstat (limited to 'pyspark/examples/kmeans.py')
-rw-r--r-- | pyspark/examples/kmeans.py | 52 |
1 files changed, 0 insertions, 52 deletions
diff --git a/pyspark/examples/kmeans.py b/pyspark/examples/kmeans.py deleted file mode 100644 index ad2be21178..0000000000 --- a/pyspark/examples/kmeans.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -This example requires numpy (http://www.numpy.org/) -""" -import sys - -import numpy as np -from pyspark import SparkContext - - -def parseVector(line): - return np.array([float(x) for x in line.split(' ')]) - - -def closestPoint(p, centers): - bestIndex = 0 - closest = float("+inf") - for i in range(len(centers)): - tempDist = np.sum((p - centers[i]) ** 2) - if tempDist < closest: - closest = tempDist - bestIndex = i - return bestIndex - - -if __name__ == "__main__": - if len(sys.argv) < 5: - print >> sys.stderr, \ - "Usage: PythonKMeans <master> <file> <k> <convergeDist>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonKMeans") - lines = sc.textFile(sys.argv[2]) - data = lines.map(parseVector).cache() - K = int(sys.argv[3]) - convergeDist = float(sys.argv[4]) - - kPoints = data.takeSample(False, K, 34) - tempDist = 1.0 - - while tempDist > convergeDist: - closest = data.map( - lambda p : (closestPoint(p, kPoints), (p, 1))) - pointStats = closest.reduceByKey( - lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) - newPoints = pointStats.map( - lambda (x, (y, z)): (x, y / z)).collect() - - tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) - - for (x, y) in newPoints: - kPoints[x] = y - - print "Final centers: " + str(kPoints) |