aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwm624@hotmail.com <wm624@hotmail.com>2017-01-07 11:07:49 -0800
committerJoseph K. Bradley <joseph@databricks.com>2017-01-07 11:07:49 -0800
commit036b50347c56a3541c526b1270093163b9b79e45 (patch)
tree94f6a8243b7ae919d74a09433c8e8ecbe6aeda68
parentb3d39620c563e5f6a32a4082aa3908e1009c17d2 (diff)
downloadspark-036b50347c56a3541c526b1270093163b9b79e45.tar.gz
spark-036b50347c56a3541c526b1270093163b9b79e45.tar.bz2
spark-036b50347c56a3541c526b1270093163b9b79e45.zip
[SPARK-19110][ML][MLLIB] DistributedLDAModel returns different logPrior for original and loaded model
## What changes were proposed in this pull request? While adding DistributedLDAModel training summary for SparkR, I found that the logPrior for original and loaded model is different. For example, in the test("read/write DistributedLDAModel"), I add the test: val logPrior = model.asInstanceOf[DistributedLDAModel].logPrior val logPrior2 = model2.asInstanceOf[DistributedLDAModel].logPrior assert(logPrior === logPrior2) The test fails: -4.394180878889078 did not equal -4.294290536919573 The reason is that `graph.vertices.aggregate(0.0)(seqOp, _ + _)` only returns the value of a single vertex instead of the aggregation of all vertices. Therefore, when the loaded model does the aggregation in a different order, it returns different `logPrior`. Please refer to #16464 for details. ## How was this patch tested? Add a new unit test for testing logPrior. Author: wm624@hotmail.com <wm624@hotmail.com> Closes #16491 from wangmiao1981/ldabug.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala8
2 files changed, 10 insertions, 2 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 933a5f1d52..ae33698209 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -745,12 +745,12 @@ class DistributedLDAModel private[clustering] (
val N_wk = vertex._2
val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0)
val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k
- (eta - 1.0) * sum(phi_wk.map(math.log))
+ sumPrior + (eta - 1.0) * sum(phi_wk.map(math.log))
} else {
val N_kj = vertex._2
val smoothed_N_kj: TopicCounts = N_kj + (alpha - 1.0)
val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0)
- (alpha - 1.0) * sum(theta_kj.map(math.log))
+ sumPrior + (alpha - 1.0) * sum(theta_kj.map(math.log))
}
}
graph.vertices.aggregate(0.0)(seqOp, _ + _)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 3f39deddf2..9aa11fbdbe 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -260,6 +260,14 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(model2.topicsMatrix.toArray) absTol 1e-6)
assert(Vectors.dense(model.getDocConcentration) ~==
Vectors.dense(model2.getDocConcentration) absTol 1e-6)
+ val logPrior = model.asInstanceOf[DistributedLDAModel].logPrior
+ val logPrior2 = model2.asInstanceOf[DistributedLDAModel].logPrior
+ val trainingLogLikelihood =
+ model.asInstanceOf[DistributedLDAModel].trainingLogLikelihood
+ val trainingLogLikelihood2 =
+ model2.asInstanceOf[DistributedLDAModel].trainingLogLikelihood
+ assert(logPrior ~== logPrior2 absTol 1e-6)
+ assert(trainingLogLikelihood ~== trainingLogLikelihood2 absTol 1e-6)
}
val lda = new LDA()
testEstimatorAndModelReadWrite(lda, dataset,