aboutsummaryrefslogtreecommitdiff
path: root/pyspark
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-22 00:43:55 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-08-22 00:43:55 -0700
commit607b53abfca049e7d9139e2d29893a3bb252de19 (patch)
tree795fba68b52d3d70b6369150f20d1a9da5cdb5ea /pyspark
parentfd94e5443c99775bfad1928729f5075c900ad0f9 (diff)
downloadspark-607b53abfca049e7d9139e2d29893a3bb252de19.tar.gz
spark-607b53abfca049e7d9139e2d29893a3bb252de19.tar.bz2
spark-607b53abfca049e7d9139e2d29893a3bb252de19.zip
Use numpy in Python k-means example.
Diffstat (limited to 'pyspark')
-rw-r--r--pyspark/pyspark/examples/kmeans.py23
-rw-r--r--pyspark/pyspark/rdd.py9
-rw-r--r--pyspark/pyspark/worker.py8
3 files changed, 14 insertions, 26 deletions
diff --git a/pyspark/pyspark/examples/kmeans.py b/pyspark/pyspark/examples/kmeans.py
index 0761d6e395..9cc366f03c 100644
--- a/pyspark/pyspark/examples/kmeans.py
+++ b/pyspark/pyspark/examples/kmeans.py
@@ -1,25 +1,18 @@
import sys
from pyspark.context import SparkContext
+from numpy import array, sum as np_sum
def parseVector(line):
- return [float(x) for x in line.split(' ')]
-
-
-def addVec(x, y):
- return [a + b for (a, b) in zip(x, y)]
-
-
-def squaredDist(x, y):
- return sum((a - b) ** 2 for (a, b) in zip(x, y))
+ return array([float(x) for x in line.split(' ')])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
- tempDist = squaredDist(p, centers[i])
+ tempDist = np_sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
@@ -41,14 +34,14 @@ if __name__ == "__main__":
tempDist = 1.0
while tempDist > convergeDist:
- closest = data.mapPairs(
+ closest = data.map(
lambda p : (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
- lambda (x1, y1), (x2, y2): (addVec(x1, x2), y1 + y2))
- newPoints = pointStats.mapPairs(
- lambda (x, (y, z)): (x, [a / z for a in y])).collect()
+ lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
+ newPoints = pointStats.map(
+ lambda (x, (y, z)): (x, y / z)).collect()
- tempDist = sum(squaredDist(kPoints[x], y) for (x, y) in newPoints)
+ tempDist = sum(np_sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints:
kPoints[x] = y
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index 8eccddc0a2..ff9c483032 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -71,7 +71,7 @@ class RDD(object):
def takeSample(self, withReplacement, num, seed):
vals = self._jrdd.takeSample(withReplacement, num, seed)
- return [PickleSerializer.loads(x) for x in vals]
+ return [PickleSerializer.loads(bytes(x)) for x in vals]
def union(self, other):
"""
@@ -218,17 +218,16 @@ class RDD(object):
# TODO: pipelining
# TODO: optimizations
- def shuffle(self, numSplits):
+ def shuffle(self, numSplits, hashFunc=hash):
if numSplits is None:
numSplits = self.ctx.defaultParallelism
- pipe_command = RDD._get_pipe_command('shuffle_map_step', [])
+ pipe_command = RDD._get_pipe_command('shuffle_map_step', [hashFunc])
class_manifest = self._jrdd.classManifest()
python_rdd = self.ctx.jvm.PythonPairRDD(self._jrdd.rdd(),
pipe_command, False, self.ctx.pythonExec, class_manifest)
partitioner = self.ctx.jvm.spark.HashPartitioner(numSplits)
jrdd = python_rdd.asJavaPairRDD().partitionBy(partitioner)
jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
- # TODO: extract second value.
return RDD(jrdd, self.ctx)
@@ -277,8 +276,6 @@ class RDD(object):
map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True)
- # TODO: implement shuffle.
-
# TODO: support varargs cogroup of several RDDs.
def groupWith(self, other):
return self.cogroup(other)
diff --git a/pyspark/pyspark/worker.py b/pyspark/pyspark/worker.py
index 21ff84fb17..b13ed5699a 100644
--- a/pyspark/pyspark/worker.py
+++ b/pyspark/pyspark/worker.py
@@ -48,9 +48,6 @@ def do_map(flat=False):
f = load_function()
for obj in read_input():
try:
- #from pickletools import dis
- #print repr(obj)
- #print dis(obj)
out = f(PickleSerializer.loads(obj))
if out is not None:
if flat:
@@ -64,9 +61,10 @@ def do_map(flat=False):
def do_shuffle_map_step():
+ hashFunc = load_function()
for obj in read_input():
- key = PickleSerializer.loads(obj)[1]
- output(str(hash(key)))
+ key = PickleSerializer.loads(obj)[0]
+ output(str(hashFunc(key)))
output(obj)