aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/clustering.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/clustering.py')
-rw-r--r--python/pyspark/mllib/clustering.py38
1 files changed, 16 insertions, 22 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index f3e952a1d8..12c5602271 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -15,15 +15,9 @@
# limitations under the License.
#
-from numpy import array, dot
-from math import sqrt
from pyspark import SparkContext
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
__all__ = ['KMeansModel', 'KMeans']
@@ -32,6 +26,7 @@ class KMeansModel(object):
"""A clustering model derived from the k-means method.
+ >>> from numpy import array
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
@@ -71,8 +66,9 @@ class KMeansModel(object):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = float("inf")
- for i in range(0, len(self.centers)):
- distance = _squared_distance(x, self.centers[i])
+ x = _convert_to_vector(x)
+ for i in xrange(len(self.centers)):
+ distance = x.squared_distance(self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
@@ -82,19 +78,17 @@ class KMeansModel(object):
class KMeans(object):
@classmethod
- def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"):
+ def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"):
"""Train a k-means clustering model."""
- sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
- ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(
- dataBytes._jrdd, k, maxIterations, runs, initializationMode)
- if len(ans) != 1:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray")
- matrix = _deserialize_double_matrix(ans[0])
- return KMeansModel([row for row in matrix])
+ sc = rdd.context
+ ser = PickleSerializer()
+ # cache serialized data to avoid objects over head in JVM
+ cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
+ model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
+ cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+ bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
+ centers = ser.loads(str(bytes))
+ return KMeansModel([c.toArray() for c in centers])
def _test():