aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFeynman Liang <fliang@databricks.com>2015-07-31 12:12:22 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-07-31 12:12:22 -0700
commita8340fa7df17e3f0a3658f8b8045ab840845a72a (patch)
tree25ba59f08c4976cbf2d640cea9ff1888b3030a16 /mllib
parentd04634701413410938a133358fe1d9fbc077645e (diff)
downloadspark-a8340fa7df17e3f0a3658f8b8045ab840845a72a.tar.gz
spark-a8340fa7df17e3f0a3658f8b8045ab840845a72a.tar.bz2
spark-a8340fa7df17e3f0a3658f8b8045ab840845a72a.zip
[SPARK-9481] Add logLikelihood to LocalLDAModel
jkbradley Exposes `bound` (variational log likelihood bound) through public API as `logLikelihood`. Also adds unit tests, some DRYing of `LDASuite`, and includes unit tests mentioned in #7760 Author: Feynman Liang <fliang@databricks.com> Closes #7801 from feynmanliang/SPARK-9481-logLikelihood and squashes the following commits: 6d1b2c9 [Feynman Liang] Negate perplexity definition 5f62b20 [Feynman Liang] Add logLikelihood
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala20
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala129
2 files changed, 78 insertions, 71 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 82281a0daf..ff7035d224 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -217,22 +217,28 @@ class LocalLDAModel private[clustering] (
LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, topicConcentration,
gammaShape)
}
- // TODO
- // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???
+
+ // TODO: declare in LDAModel and override once implemented in DistributedLDAModel
+ /**
+ * Calculates a lower bound on the log likelihood of the entire corpus.
+ * @param documents test corpus to use for calculating log likelihood
+ * @return variational lower bound on the log likelihood of the entire corpus
+ */
+ def logLikelihood(documents: RDD[(Long, Vector)]): Double = bound(documents,
+ docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k,
+ vocabSize)
/**
- * Calculate the log variational bound on perplexity. See Equation (16) in original Online
+ * Calculate an upper bound bound on perplexity. See Equation (16) in original Online
* LDA paper.
* @param documents test corpus to use for calculating perplexity
- * @return the log perplexity per word
+ * @return variational upper bound on log perplexity per word
*/
def logPerplexity(documents: RDD[(Long, Vector)]): Double = {
val corpusWords = documents
.map { case (_, termCounts) => termCounts.toArray.sum }
.sum()
- val batchVariationalBound = bound(documents, docConcentration,
- topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, vocabSize)
- val perWordBound = batchVariationalBound / corpusWords
+ val perWordBound = -logLikelihood(documents) / corpusWords
perWordBound
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 695ee3b82e..79d2a1cafd 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -210,16 +210,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}
test("OnlineLDAOptimizer with toy data") {
- def toydata: Array[(Long, Vector)] = Array(
- Vectors.sparse(6, Array(0, 1), Array(1, 1)),
- Vectors.sparse(6, Array(1, 2), Array(1, 1)),
- Vectors.sparse(6, Array(0, 2), Array(1, 1)),
- Vectors.sparse(6, Array(3, 4), Array(1, 1)),
- Vectors.sparse(6, Array(3, 5), Array(1, 1)),
- Vectors.sparse(6, Array(4, 5), Array(1, 1))
- ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
-
- val docs = sc.parallelize(toydata)
+ val docs = sc.parallelize(toyData)
val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51)
.setGammaShape(1e10)
val lda = new LDA().setK(2)
@@ -242,30 +233,45 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}
}
- test("LocalLDAModel logPerplexity") {
- val k = 2
- val vocabSize = 6
- val alpha = 0.01
- val eta = 0.01
- val gammaShape = 100
- // obtained from LDA model trained in gensim, see below
- val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
- 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
- 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
+ test("LocalLDAModel logLikelihood") {
+ val ldaModel: LocalLDAModel = toyModel
- def toydata: Array[(Long, Vector)] = Array(
- Vectors.sparse(6, Array(0, 1), Array(1, 1)),
- Vectors.sparse(6, Array(1, 2), Array(1, 1)),
- Vectors.sparse(6, Array(0, 2), Array(1, 1)),
- Vectors.sparse(6, Array(3, 4), Array(1, 1)),
- Vectors.sparse(6, Array(3, 5), Array(1, 1)),
- Vectors.sparse(6, Array(4, 5), Array(1, 1))
- ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
- val docs = sc.parallelize(toydata)
+ val docsSingleWord = sc.parallelize(Array(Vectors.sparse(6, Array(0), Array(1)))
+ .zipWithIndex
+ .map { case (wordCounts, docId) => (docId.toLong, wordCounts) })
+ val docsRepeatedWord = sc.parallelize(Array(Vectors.sparse(6, Array(0), Array(5)))
+ .zipWithIndex
+ .map { case (wordCounts, docId) => (docId.toLong, wordCounts) })
+ /* Verify results using gensim:
+ import numpy as np
+ from gensim import models
+ corpus = [
+ [(0, 1.0), (1, 1.0)],
+ [(1, 1.0), (2, 1.0)],
+ [(0, 1.0), (2, 1.0)],
+ [(3, 1.0), (4, 1.0)],
+ [(3, 1.0), (5, 1.0)],
+ [(4, 1.0), (5, 1.0)]]
+ np.random.seed(2345)
+ lda = models.ldamodel.LdaModel(
+ corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100,
+ decay=0.51, offset=1024)
+ docsSingleWord = [[(0, 1.0)]]
+ docsRepeatedWord = [[(0, 5.0)]]
+ print(lda.bound(docsSingleWord))
+ > -25.9706969833
+ print(lda.bound(docsRepeatedWord))
+ > -31.4413908227
+ */
- val ldaModel: LocalLDAModel = new LocalLDAModel(
- topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)
+ assert(ldaModel.logLikelihood(docsSingleWord) ~== -25.971 relTol 1E-3D)
+ assert(ldaModel.logLikelihood(docsRepeatedWord) ~== -31.441 relTol 1E-3D)
+ }
+
+ test("LocalLDAModel logPerplexity") {
+ val docs = sc.parallelize(toyData)
+ val ldaModel: LocalLDAModel = toyModel
/* Verify results using gensim:
import numpy as np
@@ -285,32 +291,13 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
> -3.69051285096
*/
- assert(ldaModel.logPerplexity(docs) ~== -3.690D relTol 1E-3D)
+ // Gensim's definition of perplexity is negative our (and Stanford NLP's) definition
+ assert(ldaModel.logPerplexity(docs) ~== 3.690D relTol 1E-3D)
}
test("LocalLDAModel predict") {
- val k = 2
- val vocabSize = 6
- val alpha = 0.01
- val eta = 0.01
- val gammaShape = 100
- // obtained from LDA model trained in gensim, see below
- val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
- 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
- 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
-
- def toydata: Array[(Long, Vector)] = Array(
- Vectors.sparse(6, Array(0, 1), Array(1, 1)),
- Vectors.sparse(6, Array(1, 2), Array(1, 1)),
- Vectors.sparse(6, Array(0, 2), Array(1, 1)),
- Vectors.sparse(6, Array(3, 4), Array(1, 1)),
- Vectors.sparse(6, Array(3, 5), Array(1, 1)),
- Vectors.sparse(6, Array(4, 5), Array(1, 1))
- ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
- val docs = sc.parallelize(toydata)
-
- val ldaModel: LocalLDAModel = new LocalLDAModel(
- topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)
+ val docs = sc.parallelize(toyData)
+ val ldaModel: LocalLDAModel = toyModel
/* Verify results using gensim:
import numpy as np
@@ -351,16 +338,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}
test("OnlineLDAOptimizer with asymmetric prior") {
- def toydata: Array[(Long, Vector)] = Array(
- Vectors.sparse(6, Array(0, 1), Array(1, 1)),
- Vectors.sparse(6, Array(1, 2), Array(1, 1)),
- Vectors.sparse(6, Array(0, 2), Array(1, 1)),
- Vectors.sparse(6, Array(3, 4), Array(1, 1)),
- Vectors.sparse(6, Array(3, 5), Array(1, 1)),
- Vectors.sparse(6, Array(4, 5), Array(1, 1))
- ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
-
- val docs = sc.parallelize(toydata)
+ val docs = sc.parallelize(toyData)
val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51)
.setGammaShape(1e10)
val lda = new LDA().setK(2)
@@ -531,4 +509,27 @@ private[clustering] object LDASuite {
def getNonEmptyDoc(corpus: Array[(Long, Vector)]): Array[(Long, Vector)] = corpus.filter {
case (_, wc: Vector) => Vectors.norm(wc, p = 1.0) != 0.0
}
+
+ def toyData: Array[(Long, Vector)] = Array(
+ Vectors.sparse(6, Array(0, 1), Array(1, 1)),
+ Vectors.sparse(6, Array(1, 2), Array(1, 1)),
+ Vectors.sparse(6, Array(0, 2), Array(1, 1)),
+ Vectors.sparse(6, Array(3, 4), Array(1, 1)),
+ Vectors.sparse(6, Array(3, 5), Array(1, 1)),
+ Vectors.sparse(6, Array(4, 5), Array(1, 1))
+ ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
+
+ def toyModel: LocalLDAModel = {
+ val k = 2
+ val vocabSize = 6
+ val alpha = 0.01
+ val eta = 0.01
+ val gammaShape = 100
+ val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
+ 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
+ 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
+ val ldaModel: LocalLDAModel = new LocalLDAModel(
+ topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)
+ ldaModel
+ }
}