aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2017-01-09 21:38:46 -0800
committerYanbo Liang <ybliang8@gmail.com>2017-01-09 21:38:46 -0800
commit3ef6d98a803fdff182ab4556c3273ec5fa0ff002 (patch)
tree1d8dba974664353e4146b7d9ca801e0473d0d9f4 /mllib/src/test
parentfaabe69cc081145f43f9c68db1a7a8c5c39684fb (diff)
downloadspark-3ef6d98a803fdff182ab4556c3273ec5fa0ff002.tar.gz
spark-3ef6d98a803fdff182ab4556c3273ec5fa0ff002.tar.bz2
spark-3ef6d98a803fdff182ab4556c3273ec5fa0ff002.zip
[SPARK-17847][ML] Reduce shuffled data size of GaussianMixture & copy the implementation from mllib to ml
## What changes were proposed in this pull request? Copy `GaussianMixture` implementation from mllib to ml, then we can add new features to it. I left mllib `GaussianMixture` untouched, unlike some other algorithms to wrap the ml implementation. For the following reasons: - mllib `GaussianMixture` allows k == 1, but ml does not. - mllib `GaussianMixture` supports setting initial model, but ml does not support currently. (We will definitely add this feature for ml in the future) We can get around these issues to make mllib as a wrapper calling into ml, but I'd prefer to leave mllib untouched which can make ml clean. Meanwhile, There is a big performance improvement for `GaussianMixture` in this PR. Since the covariance matrix of multivariate gaussian distribution is symmetric, we can only store the upper triangular part of the matrix and it will greatly reduce the shuffled data size. In my test, this change will reduce shuffled data size by about 50% and accelerate the job execution. Before this PR: ![image](https://cloud.githubusercontent.com/assets/1962026/19641622/4bb017ac-9996-11e6-8ece-83db184b620a.png) After this PR: ![image](https://cloud.githubusercontent.com/assets/1962026/19641635/629c21fe-9996-11e6-91e9-83ab74ae0126.png) ## How was this patch tested? Existing tests and added new tests. Author: Yanbo Liang <ybliang8@gmail.com> Closes #15413 from yanboliang/spark-17847.
Diffstat (limited to 'mllib/src/test')
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala148
1 files changed, 146 insertions, 2 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index 07299123f8..a362aeea39 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -18,22 +18,39 @@
package org.apache.spark.ml.clustering
import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{Dataset, Row}
class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
with DefaultReadWriteTest {
+ import testImplicits._
+ import GaussianMixtureSuite._
+
final val k = 5
+ private val seed = 538009335
@transient var dataset: Dataset[_] = _
+ @transient var denseDataset: Dataset[_] = _
+ @transient var sparseDataset: Dataset[_] = _
+ @transient var decompositionDataset: Dataset[_] = _
+ @transient var rDataset: Dataset[_] = _
override def beforeAll(): Unit = {
super.beforeAll()
dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k)
+ denseDataset = denseData.map(FeatureData).toDF()
+ sparseDataset = denseData.map { point =>
+ FeatureData(point.toSparse)
+ }.toDF()
+ decompositionDataset = decompositionData.map(FeatureData).toDF()
+ rDataset = rData.map(FeatureData).toDF()
}
test("default parameters") {
@@ -94,6 +111,15 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
assert(transformed.columns.contains(column))
}
+ // Check prediction matches the highest probability, and probabilities sum to one.
+ transformed.select(predictionColName, probabilityColName).collect().foreach {
+ case Row(pred: Int, prob: Vector) =>
+ val probArray = prob.toArray
+ val predFromProb = probArray.zipWithIndex.maxBy(_._1)._2
+ assert(pred === predFromProb)
+ assert(probArray.sum ~== 1.0 absTol 1E-5)
+ }
+
// Check validity of model summary
val numRows = dataset.count()
assert(model.hasSummary)
@@ -126,9 +152,93 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
testEstimatorAndModelReadWrite(gm, dataset,
GaussianMixtureSuite.allParamSettings, checkModelData)
}
+
+ test("univariate dense/sparse data with two clusters") {
+ val weights = Array(2.0 / 3.0, 1.0 / 3.0)
+ val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
+ val covs = Array(Matrices.dense(1, 1, Array(0.86644)), Matrices.dense(1, 1, Array(1.1098)))
+ val gaussians = means.zip(covs).map { case (mean, cov) =>
+ new MultivariateGaussian(mean, cov)
+ }
+ val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+
+ Seq(denseDataset, sparseDataset).foreach { dataset =>
+ val actual = new GaussianMixture().setK(2).setSeed(seed).fit(dataset)
+ modelEquals(expected, actual)
+ }
+ }
+
+ test("check distributed decomposition") {
+ val k = 5
+ val d = decompositionData.head.size
+ assert(GaussianMixture.shouldDistributeGaussians(k, d))
+
+ val gmm = new GaussianMixture().setK(k).setSeed(seed).fit(decompositionDataset)
+ assert(gmm.getK === k)
+ }
+
+ test("multivariate data and check againt R mvnormalmixEM") {
+ /*
+ Using the following R code to generate data and train the model using mixtools package.
+ library(mvtnorm)
+ library(mixtools)
+ set.seed(1)
+ a <- rmvnorm(7, c(0, 0))
+ b <- rmvnorm(8, c(10, 10))
+ data <- rbind(a, b)
+ model <- mvnormalmixEM(data, k = 2)
+ model$lambda
+
+ [1] 0.4666667 0.5333333
+
+ model$mu
+
+ [1] 0.11731091 -0.06192351
+ [1] 10.363673 9.897081
+
+ model$sigma
+
+ [[1]]
+ [,1] [,2]
+ [1,] 0.62049934 0.06880802
+ [2,] 0.06880802 1.27431874
+
+ [[2]]
+ [,1] [,2]
+ [1,] 0.2961543 0.160783
+ [2,] 0.1607830 1.008878
+ */
+ val weights = Array(0.5333333, 0.4666667)
+ val means = Array(Vectors.dense(10.363673, 9.897081), Vectors.dense(0.11731091, -0.06192351))
+ val covs = Array(Matrices.dense(2, 2, Array(0.2961543, 0.1607830, 0.160783, 1.008878)),
+ Matrices.dense(2, 2, Array(0.62049934, 0.06880802, 0.06880802, 1.27431874)))
+ val gaussians = means.zip(covs).map { case (mean, cov) =>
+ new MultivariateGaussian(mean, cov)
+ }
+
+ val expected = new GaussianMixtureModel("dummy", weights, gaussians)
+ val actual = new GaussianMixture().setK(2).setSeed(seed).fit(rDataset)
+ modelEquals(expected, actual)
+ }
+
+ test("upper triangular matrix unpacking") {
+ /*
+ The full symmetric matrix is as follows:
+ 1.0 2.5 3.8 0.9
+ 2.5 2.0 7.2 3.8
+ 3.8 7.2 3.0 1.0
+ 0.9 3.8 1.0 4.0
+ */
+ val triangularValues = Array(1.0, 2.5, 2.0, 3.8, 7.2, 3.0, 0.9, 3.8, 1.0, 4.0)
+ val symmetricValues = Array(1.0, 2.5, 3.8, 0.9, 2.5, 2.0, 7.2, 3.8,
+ 3.8, 7.2, 3.0, 1.0, 0.9, 3.8, 1.0, 4.0)
+ val symmetricMatrix = new DenseMatrix(4, 4, symmetricValues)
+ val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues)
+ assert(symmetricMatrix === expectedMatrix)
+ }
}
-object GaussianMixtureSuite {
+object GaussianMixtureSuite extends SparkFunSuite {
/**
* Mapping from all Params to valid settings which differ from the defaults.
* This is useful for tests which need to exercise all Params, such as save/load.
@@ -141,4 +251,38 @@ object GaussianMixtureSuite {
"maxIter" -> 2,
"tol" -> 0.01
)
+
+ val denseData = Seq(
+ Vectors.dense(-5.1971), Vectors.dense(-2.5359), Vectors.dense(-3.8220),
+ Vectors.dense(-5.2211), Vectors.dense(-5.0602), Vectors.dense( 4.7118),
+ Vectors.dense( 6.8989), Vectors.dense( 3.4592), Vectors.dense( 4.6322),
+ Vectors.dense( 5.7048), Vectors.dense( 4.6567), Vectors.dense( 5.5026),
+ Vectors.dense( 4.5605), Vectors.dense( 5.2043), Vectors.dense( 6.2734)
+ )
+
+ val decompositionData: Seq[Vector] = Seq.tabulate(25) { i: Int =>
+ Vectors.dense(Array.tabulate(50)(i + _.toDouble))
+ }
+
+ val rData = Seq(
+ Vectors.dense(-0.6264538, 0.1836433), Vectors.dense(-0.8356286, 1.5952808),
+ Vectors.dense(0.3295078, -0.8204684), Vectors.dense(0.4874291, 0.7383247),
+ Vectors.dense(0.5757814, -0.3053884), Vectors.dense(1.5117812, 0.3898432),
+ Vectors.dense(-0.6212406, -2.2146999), Vectors.dense(11.1249309, 9.9550664),
+ Vectors.dense(9.9838097, 10.9438362), Vectors.dense(10.8212212, 10.5939013),
+ Vectors.dense(10.9189774, 10.7821363), Vectors.dense(10.0745650, 8.0106483),
+ Vectors.dense(10.6198257, 9.9438713), Vectors.dense(9.8442045, 8.5292476),
+ Vectors.dense(9.5218499, 10.4179416)
+ )
+
+ case class FeatureData(features: Vector)
+
+ def modelEquals(m1: GaussianMixtureModel, m2: GaussianMixtureModel): Unit = {
+ assert(m1.weights.length === m2.weights.length)
+ for (i <- m1.weights.indices) {
+ assert(m1.weights(i) ~== m2.weights(i) absTol 1E-3)
+ assert(m1.gaussians(i).mean ~== m2.gaussians(i).mean absTol 1E-3)
+ assert(m1.gaussians(i).cov ~== m2.gaussians(i).cov absTol 1E-3)
+ }
+ }
}