diff options
Diffstat (limited to 'python/pyspark/mllib/clustering.py')
-rw-r--r-- | python/pyspark/mllib/clustering.py | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index f3e952a1d8..12c5602271 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -15,15 +15,9 @@ # limitations under the License. # -from numpy import array, dot -from math import sqrt from pyspark import SparkContext -from pyspark.mllib._common import \ - _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \ - _serialize_double_matrix, _deserialize_double_matrix, \ - _serialize_double_vector, _deserialize_double_vector, \ - _get_initial_weights, _serialize_rating, _regression_train_wrapper -from pyspark.mllib.linalg import SparseVector +from pyspark.serializers import PickleSerializer, AutoBatchedSerializer +from pyspark.mllib.linalg import SparseVector, _convert_to_vector __all__ = ['KMeansModel', 'KMeans'] @@ -32,6 +26,7 @@ class KMeansModel(object): """A clustering model derived from the k-means method. + >>> from numpy import array >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) >>> model = KMeans.train( ... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") @@ -71,8 +66,9 @@ class KMeansModel(object): """Find the cluster to which x belongs in this model.""" best = 0 best_distance = float("inf") - for i in range(0, len(self.centers)): - distance = _squared_distance(x, self.centers[i]) + x = _convert_to_vector(x) + for i in xrange(len(self.centers)): + distance = x.squared_distance(self.centers[i]) if distance < best_distance: best = i best_distance = distance @@ -82,19 +78,17 @@ class KMeansModel(object): class KMeans(object): @classmethod - def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): + def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" - sc = data.context - dataBytes = _get_unmangled_double_vector_rdd(data) - ans = sc._jvm.PythonMLLibAPI().trainKMeansModel( - dataBytes._jrdd, k, maxIterations, runs, initializationMode) - if len(ans) != 1: - raise RuntimeError("JVM call result had unexpected length") - elif type(ans[0]) != bytearray: - raise RuntimeError("JVM call result had first element of type " - + type(ans[0]) + " which is not bytearray") - matrix = _deserialize_double_matrix(ans[0]) - return KMeansModel([row for row in matrix]) + sc = rdd.context + ser = PickleSerializer() + # cache serialized data to avoid objects over head in JVM + cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache() + model = sc._jvm.PythonMLLibAPI().trainKMeansModel( + cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode) + bytes = sc._jvm.SerDe.dumps(model.clusterCenters()) + centers = ser.loads(str(bytes)) + return KMeansModel([c.toArray() for c in centers]) def _test(): |