aboutsummaryrefslogtreecommitdiff
path: root/pyspark/pyspark/examples/kmeans.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyspark/pyspark/examples/kmeans.py')
-rw-r--r--pyspark/pyspark/examples/kmeans.py23
1 files changed, 8 insertions, 15 deletions
diff --git a/pyspark/pyspark/examples/kmeans.py b/pyspark/pyspark/examples/kmeans.py
index 0761d6e395..9cc366f03c 100644
--- a/pyspark/pyspark/examples/kmeans.py
+++ b/pyspark/pyspark/examples/kmeans.py
@@ -1,25 +1,18 @@
import sys
from pyspark.context import SparkContext
+from numpy import array, sum as np_sum
def parseVector(line):
- return [float(x) for x in line.split(' ')]
-
-
-def addVec(x, y):
- return [a + b for (a, b) in zip(x, y)]
-
-
-def squaredDist(x, y):
- return sum((a - b) ** 2 for (a, b) in zip(x, y))
+ return array([float(x) for x in line.split(' ')])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
- tempDist = squaredDist(p, centers[i])
+ tempDist = np_sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
@@ -41,14 +34,14 @@ if __name__ == "__main__":
tempDist = 1.0
while tempDist > convergeDist:
- closest = data.mapPairs(
+ closest = data.map(
lambda p : (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
- lambda (x1, y1), (x2, y2): (addVec(x1, x2), y1 + y2))
- newPoints = pointStats.mapPairs(
- lambda (x, (y, z)): (x, [a / z for a in y])).collect()
+ lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
+ newPoints = pointStats.map(
+ lambda (x, (y, z)): (x, y / z)).collect()
- tempDist = sum(squaredDist(kPoints[x], y) for (x, y) in newPoints)
+ tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints:
kPoints[x] = y