aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorYuhao Yang <hhbyyh@gmail.com>2016-01-11 14:55:44 -0800
committerJoseph K. Bradley <joseph@databricks.com>2016-01-11 14:55:44 -0800
commitbbea88852ce6a3127d071ca40dbca2d042f9fbcf (patch)
tree7aab3ab1a41d5589c6ac7bbf2b3c3b7d7c647512 /mllib
parent4f8eefa36bb90812aac61ac7a762c9452de666bf (diff)
downloadspark-bbea88852ce6a3127d071ca40dbca2d042f9fbcf.tar.gz
spark-bbea88852ce6a3127d071ca40dbca2d042f9fbcf.tar.bz2
spark-bbea88852ce6a3127d071ca40dbca2d042f9fbcf.zip
[SPARK-10809][MLLIB] Single-document topicDistributions method for LocalLDAModel
jira: https://issues.apache.org/jira/browse/SPARK-10809 We could provide a single-document topicDistributions method for LocalLDAModel to allow for quick queries which avoid RDD operations. Currently, the user must use an RDD of documents. add some missing assert too. Author: Yuhao Yang <hhbyyh@gmail.com> Closes #9484 from hhbyyh/ldaTopicPre.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala26
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala15
2 files changed, 38 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 2fce3ff641..b30ecb8020 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -388,6 +388,32 @@ class LocalLDAModel private[spark] (
}
/**
+ * Predicts the topic mixture distribution for a document (often called "theta" in the
+ * literature). Returns a vector of zeros for an empty document.
+ *
+ * Note this means to allow quick query for single document. For batch documents, please refer
+ * to [[topicDistributions()]] to avoid overhead.
+ *
+ * @param document document to predict topic mixture distributions for
+ * @return topic mixture distribution for the document
+ */
+ @Since("2.0.0")
+ def topicDistribution(document: Vector): Vector = {
+ val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t)
+ if (document.numNonzeros == 0) {
+ Vectors.zeros(this.k)
+ } else {
+ val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
+ document,
+ expElogbeta,
+ this.docConcentration.toBreeze,
+ gammaShape,
+ this.k)
+ Vectors.dense(normalize(gamma, 1.0).toArray)
+ }
+ }
+
+ /**
* Java-friendly version of [[topicDistributions]]
*/
@Since("1.4.1")
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index faef60e084..ea23196d2c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -366,7 +366,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
(0, 0.99504), (1, 0.99504),
(1, 0.99504), (1, 0.99504))
- val actualPredictions = ldaModel.topicDistributions(docs).map { case (id, topics) =>
+ val actualPredictions = ldaModel.topicDistributions(docs).cache()
+ val topTopics = actualPredictions.map { case (id, topics) =>
// convert results to expectedPredictions format, which only has highest probability topic
val topicsBz = topics.toBreeze.toDenseVector
(id, (argmax(topicsBz), max(topicsBz)))
@@ -374,9 +375,17 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
.values
.collect()
- expectedPredictions.zip(actualPredictions).forall { case (expected, actual) =>
- expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D)
+ expectedPredictions.zip(topTopics).foreach { case (expected, actual) =>
+ assert(expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D))
}
+
+ docs.collect()
+ .map(doc => ldaModel.topicDistribution(doc._2))
+ .zip(actualPredictions.map(_._2).collect())
+ .foreach { case (single, batch) =>
+ assert(single ~== batch relTol 1E-3D)
+ }
+ actualPredictions.unpersist()
}
test("OnlineLDAOptimizer with asymmetric prior") {