diff options
author | Joseph K. Bradley <joseph@databricks.com> | 2015-08-23 18:34:07 -0700 |
---|---|---|
committer | Joseph K. Bradley <joseph@databricks.com> | 2015-08-23 18:34:07 -0700 |
commit | b963c19a803c5a26c9b65655d40ca6621acf8bd4 (patch) | |
tree | 0f86985c15a241293326d21e582d2ff6437e4c25 /mllib | |
parent | c6df5f66d9a8b9760f2cd46fcd930f977650c9c5 (diff) | |
download | spark-b963c19a803c5a26c9b65655d40ca6621acf8bd4.tar.gz spark-b963c19a803c5a26c9b65655d40ca6621acf8bd4.tar.bz2 spark-b963c19a803c5a26c9b65655d40ca6621acf8bd4.zip |
[SPARK-10164] [MLLIB] Fixed GMM distributed decomposition bug
GaussianMixture now distributes matrix decompositions for certain problem sizes. Distributed computation actually fails, but this was not tested in unit tests.
This PR adds a unit test which checks this. It failed previously but works with this fix.
CC: mengxr
Author: Joseph K. Bradley <joseph@databricks.com>
Closes #8370 from jkbradley/gmm-fix.
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala | 22 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala | 22 |
2 files changed, 35 insertions, 9 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index fcc9dfecac..daa947e81d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -169,9 +169,7 @@ class GaussianMixture private ( // Get length of the input vectors val d = breezeData.first().length - // Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when - // d > 25 except for when k is very small - val distributeGaussians = ((k - 1.0) / k) * d > 25 + val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(k, d) // Determine initial weights and corresponding Gaussians. // If the user supplied an initial GMM, we use those values, otherwise @@ -205,15 +203,15 @@ class GaussianMixture private ( // (often referred to as the "M" step in literature) val sumWeights = sums.weights.sum - if (distributeGaussians) { + if (shouldDistributeGaussians) { val numPartitions = math.min(k, 1024) val tuples = Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i))) val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) => updateWeightsAndGaussians(mean, sigma, weight, sumWeights) - }.collect.unzip - Array.copy(ws, 0, weights, 0, ws.length) - Array.copy(gs, 0, gaussians, 0, gs.length) + }.collect().unzip + Array.copy(ws.toArray, 0, weights, 0, ws.length) + Array.copy(gs.toArray, 0, gaussians, 0, gs.length) } else { var i = 0 while (i < k) { @@ -271,6 +269,16 @@ class GaussianMixture private ( } } +private[clustering] object GaussianMixture { + /** + * Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when + * d > 25 except for when k is very small. + * @param k Number of topics + * @param d Number of features + */ + def shouldDistributeGaussians(k: Int, d: Int): Boolean = ((k - 1.0) / k) * d > 25 +} + // companion class to provide zero constructor for ExpectationSum private object ExpectationSum { def zero(k: Int, d: Int): ExpectationSum = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala index b636d02f78..a72723eb00 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.clustering import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.linalg.{Vectors, Matrices} +import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrices} import org.apache.spark.mllib.stat.distribution.MultivariateGaussian import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -76,6 +76,20 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext { assert(gmm.gaussians(1).sigma ~== Esigma(1) absTol 1E-3) } + test("two clusters with distributed decompositions") { + val data = sc.parallelize(GaussianTestData.data2, 2) + + val k = 5 + val d = data.first().size + assert(GaussianMixture.shouldDistributeGaussians(k, d)) + + val gmm = new GaussianMixture() + .setK(k) + .run(data) + + assert(gmm.k === k) + } + test("single cluster with sparse data") { val data = sc.parallelize(Array( Vectors.sparse(3, Array(0, 2), Array(4.0, 2.0)), @@ -116,7 +130,7 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext { val sparseGMM = new GaussianMixture() .setK(2) .setInitialModel(initialGmm) - .run(data) + .run(sparseData) assert(sparseGMM.weights(0) ~== Ew(0) absTol 1E-3) assert(sparseGMM.weights(1) ~== Ew(1) absTol 1E-3) @@ -168,5 +182,9 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext { Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734) ) + val data2: Array[Vector] = Array.tabulate(25){ i: Int => + Vectors.dense(Array.tabulate(50)(i + _.toDouble)) + } + } } |