aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFeynman Liang <fliang@databricks.com>2015-07-30 13:17:54 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-07-30 13:17:54 -0700
commitd8cfd531c7c50c9b00ab546be458f44f84c386ae (patch)
treef7af8958a3c038ffedc094695eb20c5c41322262 /mllib
parenta20e743fb863de809863652931bc982aac2d1f86 (diff)
downloadspark-d8cfd531c7c50c9b00ab546be458f44f84c386ae.tar.gz
spark-d8cfd531c7c50c9b00ab546be458f44f84c386ae.tar.bz2
spark-d8cfd531c7c50c9b00ab546be458f44f84c386ae.zip
[SPARK-5567] [MLLIB] Add predict method to LocalLDAModel
jkbradley hhbyyh Adds `topicDistributions` to LocalLDAModel. Please review after #7757 is merged. Author: Feynman Liang <fliang@databricks.com> Closes #7760 from feynmanliang/SPARK-5567-predict-in-LDA and squashes the following commits: 0ad1134 [Feynman Liang] Remove println 27b3877 [Feynman Liang] Code review fixes 6bfb87c [Feynman Liang] Remove extra newline 476f788 [Feynman Liang] Fix checks and doc for variationalInference 061780c [Feynman Liang] Code review cleanup 3be2947 [Feynman Liang] Rename topicDistribution -> topicDistributions 2a821a6 [Feynman Liang] Add predict methods to LocalLDAModel
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala42
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala5
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala63
3 files changed, 102 insertions, 8 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index ece28848aa..6cfad3fbbd 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -186,7 +186,6 @@ abstract class LDAModel private[clustering] extends Saveable {
* This model stores only the inferred topics.
* It may be used for computing topics for new documents, but it may give less accurate answers
* than the [[DistributedLDAModel]].
- *
* @param topics Inferred topics (vocabSize x k matrix).
*/
@Experimental
@@ -221,9 +220,6 @@ class LocalLDAModel private[clustering] (
// TODO
// override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???
- // TODO:
- // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???
-
/**
* Calculate the log variational bound on perplexity. See Equation (16) in original Online
* LDA paper.
@@ -269,7 +265,7 @@ class LocalLDAModel private[clustering] (
// by topic (columns of lambda)
val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t
- var score = documents.filter(_._2.numActives > 0).map { case (id: Long, termCounts: Vector) =>
+ var score = documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) =>
var docScore = 0.0D
val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, exp(Elogbeta), brzAlpha, gammaShape, k)
@@ -277,7 +273,7 @@ class LocalLDAModel private[clustering] (
// E[log p(doc | theta, beta)]
termCounts.foreachActive { case (idx, count) =>
- docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
+ docScore += count * LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
}
// E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector
docScore += sum((brzAlpha - gammad) :* Elogthetad)
@@ -297,6 +293,40 @@ class LocalLDAModel private[clustering] (
score
}
+ /**
+ * Predicts the topic mixture distribution for each document (often called "theta" in the
+ * literature). Returns a vector of zeros for an empty document.
+ *
+ * This uses a variational approximation following Hoffman et al. (2010), where the approximate
+ * distribution is called "gamma." Technically, this method returns this approximation "gamma"
+ * for each document.
+ * @param documents documents to predict topic mixture distributions for
+ * @return An RDD of (document ID, topic mixture distribution for document)
+ */
+ // TODO: declare in LDAModel and override once implemented in DistributedLDAModel
+ def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = {
+ // Double transpose because dirichletExpectation normalizes by row and we need to normalize
+ // by topic (columns of lambda)
+ val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t)
+ val docConcentrationBrz = this.docConcentration.toBreeze
+ val gammaShape = this.gammaShape
+ val k = this.k
+
+ documents.map { case (id: Long, termCounts: Vector) =>
+ if (termCounts.numNonzeros == 0) {
+ (id, Vectors.zeros(k))
+ } else {
+ val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
+ termCounts,
+ expElogbeta,
+ docConcentrationBrz,
+ gammaShape,
+ k)
+ (id, Vectors.dense(normalize(gamma, 1.0).toArray))
+ }
+ }
+ }
+
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 4b90fbdf0c..9dbec41efe 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -394,7 +394,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
val gammaShape = this.gammaShape
val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
- val nonEmptyDocs = docs.filter(_._2.numActives > 0)
+ val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
val stat = BDM.zeros[Double](k, vocabSize)
var gammaPart = List[BDV[Double]]()
@@ -461,7 +461,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
private[clustering] object OnlineLDAOptimizer {
/**
* Uses variational inference to infer the topic distribution `gammad` given the term counts
- * for a document. `termCounts` must be non-empty, otherwise Breeze will throw a BLAS error.
+ * for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will
+ * throw a BLAS error.
*
* An optimization (Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001)
* avoids explicit computation of variational parameter `phi`.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 61d2edfd9f..d74482d3a7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -242,6 +242,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
val alpha = 0.01
val eta = 0.01
val gammaShape = 100
+ // obtained from LDA model trained in gensim, see below
val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
@@ -281,6 +282,68 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
assert(ldaModel.logPerplexity(docs) ~== -3.690D relTol 1E-3D)
}
+ test("LocalLDAModel predict") {
+ val k = 2
+ val vocabSize = 6
+ val alpha = 0.01
+ val eta = 0.01
+ val gammaShape = 100
+ // obtained from LDA model trained in gensim, see below
+ val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
+ 1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
+ 0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
+
+ def toydata: Array[(Long, Vector)] = Array(
+ Vectors.sparse(6, Array(0, 1), Array(1, 1)),
+ Vectors.sparse(6, Array(1, 2), Array(1, 1)),
+ Vectors.sparse(6, Array(0, 2), Array(1, 1)),
+ Vectors.sparse(6, Array(3, 4), Array(1, 1)),
+ Vectors.sparse(6, Array(3, 5), Array(1, 1)),
+ Vectors.sparse(6, Array(4, 5), Array(1, 1))
+ ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
+ val docs = sc.parallelize(toydata)
+
+ val ldaModel: LocalLDAModel = new LocalLDAModel(
+ topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)
+
+ /* Verify results using gensim:
+ import numpy as np
+ from gensim import models
+ corpus = [
+ [(0, 1.0), (1, 1.0)],
+ [(1, 1.0), (2, 1.0)],
+ [(0, 1.0), (2, 1.0)],
+ [(3, 1.0), (4, 1.0)],
+ [(3, 1.0), (5, 1.0)],
+ [(4, 1.0), (5, 1.0)]]
+ np.random.seed(2345)
+ lda = models.ldamodel.LdaModel(
+ corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100,
+ decay=0.51, offset=1024)
+ print(list(lda.get_document_topics(corpus)))
+ > [[(0, 0.99504950495049516)], [(0, 0.99504950495049516)],
+ > [(0, 0.99504950495049516)], [(1, 0.99504950495049516)],
+ > [(1, 0.99504950495049516)], [(1, 0.99504950495049516)]]
+ */
+
+ val expectedPredictions = List(
+ (0, 0.99504), (0, 0.99504),
+ (0, 0.99504), (1, 0.99504),
+ (1, 0.99504), (1, 0.99504))
+
+ val actualPredictions = ldaModel.topicDistributions(docs).map { case (id, topics) =>
+ // convert results to expectedPredictions format, which only has highest probability topic
+ val topicsBz = topics.toBreeze.toDenseVector
+ (id, (argmax(topicsBz), max(topicsBz)))
+ }.sortByKey()
+ .values
+ .collect()
+
+ expectedPredictions.zip(actualPredictions).forall { case (expected, actual) =>
+ expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D)
+ }
+ }
+
test("OnlineLDAOptimizer with asymmetric prior") {
def toydata: Array[(Long, Vector)] = Array(
Vectors.sparse(6, Array(0, 1), Array(1, 1)),