aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph K. Bradley <joseph@databricks.com>2015-08-23 18:34:07 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-23 18:34:15 -0700
commit00f812d38aeb179290a710e3af1e0c11cc16da71 (patch)
treee78ff50da526088a887445f4d14ecc7c76b56487
parent1c5a828b7b90248739ce1d7754a5394a877c5508 (diff)
downloadspark-00f812d38aeb179290a710e3af1e0c11cc16da71.tar.gz
spark-00f812d38aeb179290a710e3af1e0c11cc16da71.tar.bz2
spark-00f812d38aeb179290a710e3af1e0c11cc16da71.zip
[SPARK-10164] [MLLIB] Fixed GMM distributed decomposition bug
GaussianMixture now distributes matrix decompositions for certain problem sizes. Distributed computation actually fails, but this was not tested in unit tests. This PR adds a unit test which checks this. It failed previously but works with this fix. CC: mengxr Author: Joseph K. Bradley <joseph@databricks.com> Closes #8370 from jkbradley/gmm-fix. (cherry picked from commit b963c19a803c5a26c9b65655d40ca6621acf8bd4) Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala22
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala22
2 files changed, 35 insertions, 9 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index fcc9dfecac..daa947e81d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -169,9 +169,7 @@ class GaussianMixture private (
// Get length of the input vectors
val d = breezeData.first().length
- // Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
- // d > 25 except for when k is very small
- val distributeGaussians = ((k - 1.0) / k) * d > 25
+ val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(k, d)
// Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
@@ -205,15 +203,15 @@ class GaussianMixture private (
// (often referred to as the "M" step in literature)
val sumWeights = sums.weights.sum
- if (distributeGaussians) {
+ if (shouldDistributeGaussians) {
val numPartitions = math.min(k, 1024)
val tuples =
Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i)))
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
- }.collect.unzip
- Array.copy(ws, 0, weights, 0, ws.length)
- Array.copy(gs, 0, gaussians, 0, gs.length)
+ }.collect().unzip
+ Array.copy(ws.toArray, 0, weights, 0, ws.length)
+ Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
} else {
var i = 0
while (i < k) {
@@ -271,6 +269,16 @@ class GaussianMixture private (
}
}
+private[clustering] object GaussianMixture {
+ /**
+ * Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
+ * d > 25 except for when k is very small.
+ * @param k Number of topics
+ * @param d Number of features
+ */
+ def shouldDistributeGaussians(k: Int, d: Int): Boolean = ((k - 1.0) / k) * d > 25
+}
+
// companion class to provide zero constructor for ExpectationSum
private object ExpectationSum {
def zero(k: Int, d: Int): ExpectationSum = {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
index b636d02f78..a72723eb00 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.mllib.clustering
import org.apache.spark.SparkFunSuite
-import org.apache.spark.mllib.linalg.{Vectors, Matrices}
+import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrices}
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@@ -76,6 +76,20 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(gmm.gaussians(1).sigma ~== Esigma(1) absTol 1E-3)
}
+ test("two clusters with distributed decompositions") {
+ val data = sc.parallelize(GaussianTestData.data2, 2)
+
+ val k = 5
+ val d = data.first().size
+ assert(GaussianMixture.shouldDistributeGaussians(k, d))
+
+ val gmm = new GaussianMixture()
+ .setK(k)
+ .run(data)
+
+ assert(gmm.k === k)
+ }
+
test("single cluster with sparse data") {
val data = sc.parallelize(Array(
Vectors.sparse(3, Array(0, 2), Array(4.0, 2.0)),
@@ -116,7 +130,7 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext {
val sparseGMM = new GaussianMixture()
.setK(2)
.setInitialModel(initialGmm)
- .run(data)
+ .run(sparseData)
assert(sparseGMM.weights(0) ~== Ew(0) absTol 1E-3)
assert(sparseGMM.weights(1) ~== Ew(1) absTol 1E-3)
@@ -168,5 +182,9 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext {
Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
)
+ val data2: Array[Vector] = Array.tabulate(25){ i: Int =>
+ Vectors.dense(Array.tabulate(50)(i + _.toDouble))
+ }
+
}
}