diff options
author | FlytxtRnD <meethu.mathew@flytxt.com> | 2015-02-02 23:04:55 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-02-02 23:04:55 -0800 |
commit | 50a1a874e1d087a6c79835b1936d0009622a97b1 (patch) | |
tree | 81381fdb41d6bf9e3cbf59291f200fbc5ddab3d1 /python/pyspark/mllib | |
parent | c31c36c4a76bd3449696383321332ec95bff7fed (diff) | |
download | spark-50a1a874e1d087a6c79835b1936d0009622a97b1.tar.gz spark-50a1a874e1d087a6c79835b1936d0009622a97b1.tar.bz2 spark-50a1a874e1d087a6c79835b1936d0009622a97b1.zip |
[SPARK-5012][MLLib][PySpark]Python API for Gaussian Mixture Model
Python API for the Gaussian Mixture Model clustering algorithm in MLLib.
Author: FlytxtRnD <meethu.mathew@flytxt.com>
Closes #4059 from FlytxtRnD/PythonGmmWrapper and squashes the following commits:
c973ab3 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper
339b09c [FlytxtRnD] Added MultivariateGaussian namedtuple and Arraybuffer in trainGaussianMixture
fa0a142 [FlytxtRnD] New line added
d5b36ab [FlytxtRnD] Changed argument names to lowercase
ac134f1 [FlytxtRnD] Merge branch 'PythonGmmWrapper' of https://github.com/FlytxtRnD/spark into PythonGmmWrapper
6671ea1 [FlytxtRnD] Added mllib/stat/distribution.py
3aee84b [FlytxtRnD] Fixed style issues
2e9f12a [FlytxtRnD] Added mllib/stat/distribution.py and fixed style issues
b22532c [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper
2e14d82 [FlytxtRnD] Incorporate MultivariateGaussian instances in GaussianMixtureModel
05767c7 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper
3464d19 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper
c1d4c71 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'origin/PythonGmmWrapper' into PythonGmmWrapper
426d130 [FlytxtRnD] Added random seed parameter
332bad1 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper
f82750b [FlytxtRnD] Fixed style issues
5c83825 [FlytxtRnD] Split input file with space delimiter
fda60f3 [FlytxtRnD] Python API for Gaussian Mixture Model
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r-- | python/pyspark/mllib/clustering.py | 92 | ||||
-rw-r--r-- | python/pyspark/mllib/stat/__init__.py | 3 | ||||
-rw-r--r-- | python/pyspark/mllib/stat/distribution.py | 31 | ||||
-rw-r--r-- | python/pyspark/mllib/tests.py | 26 |
4 files changed, 147 insertions, 5 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 6b713aa393..f6b97abb17 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -15,19 +15,22 @@ # limitations under the License. # +from numpy import array + +from pyspark import RDD from pyspark import SparkContext from pyspark.mllib.common import callMLlibFunc, callJavaFunc -from pyspark.mllib.linalg import SparseVector, _convert_to_vector +from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector +from pyspark.mllib.stat.distribution import MultivariateGaussian -__all__ = ['KMeansModel', 'KMeans'] +__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture'] class KMeansModel(object): """A clustering model derived from the k-means method. - >>> from numpy import array - >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) + >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2) >>> model = KMeans.train( ... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) @@ -86,6 +89,87 @@ class KMeans(object): return KMeansModel([c.toArray() for c in centers]) +class GaussianMixtureModel(object): + + """A clustering model derived from the Gaussian Mixture Model method. + + >>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1, + ... 0.9,0.8,0.75,0.935, + ... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2)) + >>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001, + ... maxIterations=50, seed=10) + >>> labels = model.predict(clusterdata_1).collect() + >>> labels[0]==labels[1] + False + >>> labels[1]==labels[2] + True + >>> labels[4]==labels[5] + True + >>> clusterdata_2 = sc.parallelize(array([-5.1971, -2.5359, -3.8220, + ... -5.2211, -5.0602, 4.7118, + ... 6.8989, 3.4592, 4.6322, + ... 5.7048, 4.6567, 5.5026, + ... 4.5605, 5.2043, 6.2734]).reshape(5, 3)) + >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001, + ... maxIterations=150, seed=10) + >>> labels = model.predict(clusterdata_2).collect() + >>> labels[0]==labels[1]==labels[2] + True + >>> labels[3]==labels[4] + True + """ + + def __init__(self, weights, gaussians): + self.weights = weights + self.gaussians = gaussians + self.k = len(self.weights) + + def predict(self, x): + """ + Find the cluster to which the points in 'x' has maximum membership + in this model. + + :param x: RDD of data points. + :return: cluster_labels. RDD of cluster labels. + """ + if isinstance(x, RDD): + cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z))) + return cluster_labels + + def predictSoft(self, x): + """ + Find the membership of each point in 'x' to all mixture components. + + :param x: RDD of data points. + :return: membership_matrix. RDD of array of double values. + """ + if isinstance(x, RDD): + means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians]) + membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector), + self.weights, means, sigmas) + return membership_matrix + + +class GaussianMixture(object): + """ + Estimate model parameters with the expectation-maximization algorithm. + + :param data: RDD of data points + :param k: Number of components + :param convergenceTol: Threshold value to check the convergence criteria. Defaults to 1e-3 + :param maxIterations: Number of iterations. Default to 100 + :param seed: Random Seed + """ + @classmethod + def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None): + """Train a Gaussian Mixture clustering model.""" + weight, mu, sigma = callMLlibFunc("trainGaussianMixture", + rdd.map(_convert_to_vector), k, + convergenceTol, maxIterations, seed) + mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)] + return GaussianMixtureModel(weight, mvg_obj) + + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/stat/__init__.py b/python/pyspark/mllib/stat/__init__.py index 799d260c09..b686d955a0 100644 --- a/python/pyspark/mllib/stat/__init__.py +++ b/python/pyspark/mllib/stat/__init__.py @@ -20,5 +20,6 @@ Python package for statistical functions in MLlib. """ from pyspark.mllib.stat._statistics import * +from pyspark.mllib.stat.distribution import MultivariateGaussian -__all__ = ["Statistics", "MultivariateStatisticalSummary"] +__all__ = ["Statistics", "MultivariateStatisticalSummary", "MultivariateGaussian"] diff --git a/python/pyspark/mllib/stat/distribution.py b/python/pyspark/mllib/stat/distribution.py new file mode 100644 index 0000000000..07792e1532 --- /dev/null +++ b/python/pyspark/mllib/stat/distribution.py @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from collections import namedtuple + +__all__ = ['MultivariateGaussian'] + + +class MultivariateGaussian(namedtuple('MultivariateGaussian', ['mu', 'sigma'])): + + """ Represents a (mu, sigma) tuple + >>> m = MultivariateGaussian(Vectors.dense([11,12]),DenseMatrix(2, 2, (1.0, 3.0, 5.0, 2.0))) + >>> (m.mu, m.sigma.toArray()) + (DenseVector([11.0, 12.0]), array([[ 1., 5.],[ 3., 2.]])) + >>> (m[0], m[1]) + (DenseVector([11.0, 12.0]), array([[ 1., 5.],[ 3., 2.]])) + """ diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 61e0cf5d90..42aa228737 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -167,6 +167,32 @@ class ListTests(PySparkTestCase): # TODO: Allow small numeric difference. self.assertTrue(array_equal(c1, c2)) + def test_gmm(self): + from pyspark.mllib.clustering import GaussianMixture + data = self.sc.parallelize([ + [1, 2], + [8, 9], + [-4, -3], + [-6, -7], + ]) + clusters = GaussianMixture.train(data, 2, convergenceTol=0.001, + maxIterations=100, seed=56) + labels = clusters.predict(data).collect() + self.assertEquals(labels[0], labels[1]) + self.assertEquals(labels[2], labels[3]) + + def test_gmm_deterministic(self): + from pyspark.mllib.clustering import GaussianMixture + x = range(0, 100, 10) + y = range(0, 100, 10) + data = self.sc.parallelize([[a, b] for a, b in zip(x, y)]) + clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001, + maxIterations=100, seed=63) + clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001, + maxIterations=100, seed=63) + for c1, c2 in zip(clusters1.weights, clusters2.weights): + self.assertEquals(round(c1, 7), round(c2, 7)) + def test_classification(self): from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees |