aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorFlytxtRnD <meethu.mathew@flytxt.com>2015-02-02 23:04:55 -0800
committerXiangrui Meng <meng@databricks.com>2015-02-02 23:04:55 -0800
commit50a1a874e1d087a6c79835b1936d0009622a97b1 (patch)
tree81381fdb41d6bf9e3cbf59291f200fbc5ddab3d1 /python
parentc31c36c4a76bd3449696383321332ec95bff7fed (diff)
downloadspark-50a1a874e1d087a6c79835b1936d0009622a97b1.tar.gz
spark-50a1a874e1d087a6c79835b1936d0009622a97b1.tar.bz2
spark-50a1a874e1d087a6c79835b1936d0009622a97b1.zip
[SPARK-5012][MLLib][PySpark]Python API for Gaussian Mixture Model
Python API for the Gaussian Mixture Model clustering algorithm in MLLib. Author: FlytxtRnD <meethu.mathew@flytxt.com> Closes #4059 from FlytxtRnD/PythonGmmWrapper and squashes the following commits: c973ab3 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper 339b09c [FlytxtRnD] Added MultivariateGaussian namedtuple and Arraybuffer in trainGaussianMixture fa0a142 [FlytxtRnD] New line added d5b36ab [FlytxtRnD] Changed argument names to lowercase ac134f1 [FlytxtRnD] Merge branch 'PythonGmmWrapper' of https://github.com/FlytxtRnD/spark into PythonGmmWrapper 6671ea1 [FlytxtRnD] Added mllib/stat/distribution.py 3aee84b [FlytxtRnD] Fixed style issues 2e9f12a [FlytxtRnD] Added mllib/stat/distribution.py and fixed style issues b22532c [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper 2e14d82 [FlytxtRnD] Incorporate MultivariateGaussian instances in GaussianMixtureModel 05767c7 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper 3464d19 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper c1d4c71 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'origin/PythonGmmWrapper' into PythonGmmWrapper 426d130 [FlytxtRnD] Added random seed parameter 332bad1 [FlytxtRnD] Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/master' into PythonGmmWrapper f82750b [FlytxtRnD] Fixed style issues 5c83825 [FlytxtRnD] Split input file with space delimiter fda60f3 [FlytxtRnD] Python API for Gaussian Mixture Model
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/clustering.py92
-rw-r--r--python/pyspark/mllib/stat/__init__.py3
-rw-r--r--python/pyspark/mllib/stat/distribution.py31
-rw-r--r--python/pyspark/mllib/tests.py26
4 files changed, 147 insertions, 5 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 6b713aa393..f6b97abb17 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -15,19 +15,22 @@
# limitations under the License.
#
+from numpy import array
+
+from pyspark import RDD
from pyspark import SparkContext
from pyspark.mllib.common import callMLlibFunc, callJavaFunc
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
+from pyspark.mllib.stat.distribution import MultivariateGaussian
-__all__ = ['KMeansModel', 'KMeans']
+__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture']
class KMeansModel(object):
"""A clustering model derived from the k-means method.
- >>> from numpy import array
- >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+ >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
@@ -86,6 +89,87 @@ class KMeans(object):
return KMeansModel([c.toArray() for c in centers])
+class GaussianMixtureModel(object):
+
+ """A clustering model derived from the Gaussian Mixture Model method.
+
+ >>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
+ ... 0.9,0.8,0.75,0.935,
+ ... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2))
+ >>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001,
+ ... maxIterations=50, seed=10)
+ >>> labels = model.predict(clusterdata_1).collect()
+ >>> labels[0]==labels[1]
+ False
+ >>> labels[1]==labels[2]
+ True
+ >>> labels[4]==labels[5]
+ True
+ >>> clusterdata_2 = sc.parallelize(array([-5.1971, -2.5359, -3.8220,
+ ... -5.2211, -5.0602, 4.7118,
+ ... 6.8989, 3.4592, 4.6322,
+ ... 5.7048, 4.6567, 5.5026,
+ ... 4.5605, 5.2043, 6.2734]).reshape(5, 3))
+ >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
+ ... maxIterations=150, seed=10)
+ >>> labels = model.predict(clusterdata_2).collect()
+ >>> labels[0]==labels[1]==labels[2]
+ True
+ >>> labels[3]==labels[4]
+ True
+ """
+
+ def __init__(self, weights, gaussians):
+ self.weights = weights
+ self.gaussians = gaussians
+ self.k = len(self.weights)
+
+ def predict(self, x):
+ """
+ Find the cluster to which the points in 'x' has maximum membership
+ in this model.
+
+ :param x: RDD of data points.
+ :return: cluster_labels. RDD of cluster labels.
+ """
+ if isinstance(x, RDD):
+ cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
+ return cluster_labels
+
+ def predictSoft(self, x):
+ """
+ Find the membership of each point in 'x' to all mixture components.
+
+ :param x: RDD of data points.
+ :return: membership_matrix. RDD of array of double values.
+ """
+ if isinstance(x, RDD):
+ means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
+ membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
+ self.weights, means, sigmas)
+ return membership_matrix
+
+
+class GaussianMixture(object):
+ """
+ Estimate model parameters with the expectation-maximization algorithm.
+
+ :param data: RDD of data points
+ :param k: Number of components
+ :param convergenceTol: Threshold value to check the convergence criteria. Defaults to 1e-3
+ :param maxIterations: Number of iterations. Default to 100
+ :param seed: Random Seed
+ """
+ @classmethod
+ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
+ """Train a Gaussian Mixture clustering model."""
+ weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
+ rdd.map(_convert_to_vector), k,
+ convergenceTol, maxIterations, seed)
+ mvg_obj = [MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]
+ return GaussianMixtureModel(weight, mvg_obj)
+
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/stat/__init__.py b/python/pyspark/mllib/stat/__init__.py
index 799d260c09..b686d955a0 100644
--- a/python/pyspark/mllib/stat/__init__.py
+++ b/python/pyspark/mllib/stat/__init__.py
@@ -20,5 +20,6 @@ Python package for statistical functions in MLlib.
"""
from pyspark.mllib.stat._statistics import *
+from pyspark.mllib.stat.distribution import MultivariateGaussian
-__all__ = ["Statistics", "MultivariateStatisticalSummary"]
+__all__ = ["Statistics", "MultivariateStatisticalSummary", "MultivariateGaussian"]
diff --git a/python/pyspark/mllib/stat/distribution.py b/python/pyspark/mllib/stat/distribution.py
new file mode 100644
index 0000000000..07792e1532
--- /dev/null
+++ b/python/pyspark/mllib/stat/distribution.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from collections import namedtuple
+
+__all__ = ['MultivariateGaussian']
+
+
+class MultivariateGaussian(namedtuple('MultivariateGaussian', ['mu', 'sigma'])):
+
+ """ Represents a (mu, sigma) tuple
+ >>> m = MultivariateGaussian(Vectors.dense([11,12]),DenseMatrix(2, 2, (1.0, 3.0, 5.0, 2.0)))
+ >>> (m.mu, m.sigma.toArray())
+ (DenseVector([11.0, 12.0]), array([[ 1., 5.],[ 3., 2.]]))
+ >>> (m[0], m[1])
+ (DenseVector([11.0, 12.0]), array([[ 1., 5.],[ 3., 2.]]))
+ """
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 61e0cf5d90..42aa228737 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -167,6 +167,32 @@ class ListTests(PySparkTestCase):
# TODO: Allow small numeric difference.
self.assertTrue(array_equal(c1, c2))
+ def test_gmm(self):
+ from pyspark.mllib.clustering import GaussianMixture
+ data = self.sc.parallelize([
+ [1, 2],
+ [8, 9],
+ [-4, -3],
+ [-6, -7],
+ ])
+ clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
+ maxIterations=100, seed=56)
+ labels = clusters.predict(data).collect()
+ self.assertEquals(labels[0], labels[1])
+ self.assertEquals(labels[2], labels[3])
+
+ def test_gmm_deterministic(self):
+ from pyspark.mllib.clustering import GaussianMixture
+ x = range(0, 100, 10)
+ y = range(0, 100, 10)
+ data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
+ clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
+ maxIterations=100, seed=63)
+ clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
+ maxIterations=100, seed=63)
+ for c1, c2 in zip(clusters1.weights, clusters2.weights):
+ self.assertEquals(round(c1, 7), round(c2, 7))
+
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees