aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFeynman Liang <fliang@databricks.com>2015-07-08 16:32:00 -0700
committerXiangrui Meng <meng@databricks.com>2015-07-08 16:32:00 -0700
commitf472b8cdc00839780dc79be0bbe53a098cde230c (patch)
tree52e6d95de98dfc00a5e2ce0be13d9ac1eba38677 /mllib
parent8c32b2e870c7c250a63e838718df833edf6dea07 (diff)
downloadspark-f472b8cdc00839780dc79be0bbe53a098cde230c.tar.gz
spark-f472b8cdc00839780dc79be0bbe53a098cde230c.tar.bz2
spark-f472b8cdc00839780dc79be0bbe53a098cde230c.zip
[SPARK-5016] [MLLIB] Distribute GMM mixture components to executors
Distribute expensive portions of computation for Gaussian mixture components (in particular, pre-computation of `MultivariateGaussian.rootSigmaInv`, the inverse covariance matrix and covariance determinant) across executors. Repost of PR#4654. Notes for reviewers: * What should be the policy for when to distribute computation. Always? When numClusters > threshold? User-specified param? TODO: * Performance testing and comparison for large number of clusters Author: Feynman Liang <fliang@databricks.com> Closes #7166 from feynmanliang/GMM_parallel_mixtures and squashes the following commits: 4f351fa [Feynman Liang] Update heuristic and scaladoc 5ea947e [Feynman Liang] Fix parallelization logic 00eb7db [Feynman Liang] Add helper method for GMM's M step, remove distributeGaussians flag e7c8127 [Feynman Liang] Add distributeGaussians flag and tests 1da3c7f [Feynman Liang] Distribute mixtures
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala44
1 files changed, 36 insertions, 8 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index fc509d2ba1..e459367333 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -140,6 +140,10 @@ class GaussianMixture private (
// Get length of the input vectors
val d = breezeData.first().length
+ // Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
+ // d > 25 except for when k is very small
+ val distributeGaussians = ((k - 1.0) / k) * d > 25
+
// Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
// we start with uniform weights, a random mean from the data, and
@@ -171,14 +175,25 @@ class GaussianMixture private (
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
val sumWeights = sums.weights.sum
- var i = 0
- while (i < k) {
- val mu = sums.means(i) / sums.weights(i)
- BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu),
- Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
- weights(i) = sums.weights(i) / sumWeights
- gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i))
- i = i + 1
+
+ if (distributeGaussians) {
+ val numPartitions = math.min(k, 1024)
+ val tuples =
+ Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i)))
+ val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
+ updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
+ }.collect.unzip
+ Array.copy(ws, 0, weights, 0, ws.length)
+ Array.copy(gs, 0, gaussians, 0, gs.length)
+ } else {
+ var i = 0
+ while (i < k) {
+ val (weight, gaussian) =
+ updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights)
+ weights(i) = weight
+ gaussians(i) = gaussian
+ i = i + 1
+ }
}
llhp = llh // current becomes previous
@@ -192,6 +207,19 @@ class GaussianMixture private (
/** Java-friendly version of [[run()]] */
def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd)
+ private def updateWeightsAndGaussians(
+ mean: BDV[Double],
+ sigma: BreezeMatrix[Double],
+ weight: Double,
+ sumWeights: Double): (Double, MultivariateGaussian) = {
+ val mu = (mean /= weight)
+ BLAS.syr(-weight, Vectors.fromBreeze(mu),
+ Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])
+ val newWeight = weight / sumWeights
+ val newGaussian = new MultivariateGaussian(mu, sigma / weight)
+ (newWeight, newGaussian)
+ }
+
/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)