aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFeynman Liang <fliang@databricks.com>2015-07-22 15:07:05 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-07-22 15:07:05 -0700
commit1aca9c13c144fa336af6afcfa666128bf77c49d4 (patch)
tree2aa5166485e6b1611e0aac216fbb3530003b718c
parentcf21d05f8b5fae52b118fb8846f43d6fda1aea41 (diff)
downloadspark-1aca9c13c144fa336af6afcfa666128bf77c49d4.tar.gz
spark-1aca9c13c144fa336af6afcfa666128bf77c49d4.tar.bz2
spark-1aca9c13c144fa336af6afcfa666128bf77c49d4.zip
[SPARK-8536] [MLLIB] Generalize OnlineLDAOptimizer to asymmetric document-topic Dirichlet priors
Modify `LDA` to take asymmetric document-topic prior distributions and `OnlineLDAOptimizer` to use the asymmetric prior during variational inference. This PR only generalizes `OnlineLDAOptimizer` and the associated `LocalLDAModel`; `EMLDAOptimizer` and `DistributedLDAModel` still only support symmetric `alpha` (checked during `EMLDAOptimizer.initialize`). Author: Feynman Liang <fliang@databricks.com> Closes #7575 from feynmanliang/SPARK-8536-LDA-asymmetric-priors and squashes the following commits: af8fbb7 [Feynman Liang] Fix merge errors ef5821d [Feynman Liang] Merge remote-tracking branch 'apache/master' into SPARK-8536-LDA-asymmetric-priors 58f1d7b [Feynman Liang] Fix from review feedback a6dcf70 [Feynman Liang] Change docConcentration interface and move LDAOptimizer validation to initialize, add sad path tests 72038ff [Feynman Liang] Add tests referenced against gensim d4284fa [Feynman Liang] Generalize OnlineLDA to asymmetric priors, no tests
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala49
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala27
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala82
3 files changed, 126 insertions, 32 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
index a410547a72..ab124e6d77 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
@@ -23,11 +23,10 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.graphx._
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
-
/**
* :: Experimental ::
*
@@ -49,14 +48,15 @@ import org.apache.spark.util.Utils
class LDA private (
private var k: Int,
private var maxIterations: Int,
- private var docConcentration: Double,
+ private var docConcentration: Vector,
private var topicConcentration: Double,
private var seed: Long,
private var checkpointInterval: Int,
private var ldaOptimizer: LDAOptimizer) extends Logging {
- def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1,
- seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer)
+ def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1),
+ topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10,
+ ldaOptimizer = new EMLDAOptimizer)
/**
* Number of topics to infer. I.e., the number of soft cluster centers.
@@ -77,37 +77,50 @@ class LDA private (
* Concentration parameter (commonly named "alpha") for the prior placed on documents'
* distributions over topics ("theta").
*
- * This is the parameter to a symmetric Dirichlet distribution.
+ * This is the parameter to a Dirichlet distribution.
*/
- def getDocConcentration: Double = this.docConcentration
+ def getDocConcentration: Vector = this.docConcentration
/**
* Concentration parameter (commonly named "alpha") for the prior placed on documents'
* distributions over topics ("theta").
*
- * This is the parameter to a symmetric Dirichlet distribution, where larger values
- * mean more smoothing (more regularization).
+ * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing
+ * (more regularization).
*
- * If set to -1, then docConcentration is set automatically.
- * (default = -1 = automatic)
+ * If set to a singleton vector Vector(-1), then docConcentration is set automatically. If set to
+ * singleton vector Vector(t) where t != -1, then t is replicated to a vector of length k during
+ * [[LDAOptimizer.initialize()]]. Otherwise, the [[docConcentration]] vector must be length k.
+ * (default = Vector(-1) = automatic)
*
* Optimizer-specific parameter settings:
* - EM
- * - Value should be > 1.0
- * - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
- * Asuncion et al. (2009), who recommend a +1 adjustment for EM.
+ * - Currently only supports symmetric distributions, so all values in the vector should be
+ * the same.
+ * - Values should be > 1.0
+ * - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
+ * from Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - Online
- * - Value should be >= 0
- * - default = (1.0 / k), following the implementation from
+ * - Values should be >= 0
+ * - default = uniformly (1.0 / k), following the implementation from
* [[https://github.com/Blei-Lab/onlineldavb]].
*/
- def setDocConcentration(docConcentration: Double): this.type = {
+ def setDocConcentration(docConcentration: Vector): this.type = {
this.docConcentration = docConcentration
this
}
+ /** Replicates Double to create a symmetric prior */
+ def setDocConcentration(docConcentration: Double): this.type = {
+ this.docConcentration = Vectors.dense(docConcentration)
+ this
+ }
+
/** Alias for [[getDocConcentration]] */
- def getAlpha: Double = getDocConcentration
+ def getAlpha: Vector = getDocConcentration
+
+ /** Alias for [[setDocConcentration()]] */
+ def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha)
/** Alias for [[setDocConcentration()]] */
def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index b960ae6c07..f4170a3d98 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
-import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector}
+import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
/**
@@ -95,8 +95,11 @@ final class EMLDAOptimizer extends LDAOptimizer {
* Compute bipartite term/doc graph.
*/
override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
+ val docConcentration = lda.getDocConcentration(0)
+ require({
+ lda.getDocConcentration.toArray.forall(_ == docConcentration)
+ }, "EMLDAOptimizer currently only supports symmetric document-topic priors")
- val docConcentration = lda.getDocConcentration
val topicConcentration = lda.getTopicConcentration
val k = lda.getK
@@ -229,10 +232,10 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
private var vocabSize: Int = 0
/** alias for docConcentration */
- private var alpha: Double = 0
+ private var alpha: Vector = Vectors.dense(0)
/** (private[clustering] for debugging) Get docConcentration */
- private[clustering] def getAlpha: Double = alpha
+ private[clustering] def getAlpha: Vector = alpha
/** alias for topicConcentration */
private var eta: Double = 0
@@ -343,7 +346,19 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
this.k = lda.getK
this.corpusSize = docs.count()
this.vocabSize = docs.first()._2.size
- this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
+ this.alpha = if (lda.getDocConcentration.size == 1) {
+ if (lda.getDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k))
+ else {
+ require(lda.getDocConcentration(0) >= 0, s"all entries in alpha must be >=0, got: $alpha")
+ Vectors.dense(Array.fill(k)(lda.getDocConcentration(0)))
+ }
+ } else {
+ require(lda.getDocConcentration.size == k, s"alpha must have length k, got: $alpha")
+ lda.getDocConcentration.foreachActive { case (_, x) =>
+ require(x >= 0, s"all entries in alpha must be >= 0, got: $alpha")
+ }
+ lda.getDocConcentration
+ }
this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
this.randomGenerator = new Random(lda.getSeed)
@@ -372,7 +387,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
val vocabSize = this.vocabSize
val Elogbeta = dirichletExpectation(lambda).t
val expElogbeta = exp(Elogbeta)
- val alpha = this.alpha
+ val alpha = this.alpha.toBreeze
val gammaShape = this.gammaShape
val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 721a065658..da70d9bd7c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix, Vector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils
@@ -132,22 +132,38 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
test("setter alias") {
val lda = new LDA().setAlpha(2.0).setBeta(3.0)
- assert(lda.getAlpha === 2.0)
- assert(lda.getDocConcentration === 2.0)
+ assert(lda.getAlpha.toArray.forall(_ === 2.0))
+ assert(lda.getDocConcentration.toArray.forall(_ === 2.0))
assert(lda.getBeta === 3.0)
assert(lda.getTopicConcentration === 3.0)
}
+ test("initializing with alpha length != k or 1 fails") {
+ intercept[IllegalArgumentException] {
+ val lda = new LDA().setK(2).setAlpha(Vectors.dense(1, 2, 3, 4))
+ val corpus = sc.parallelize(tinyCorpus, 2)
+ lda.run(corpus)
+ }
+ }
+
+ test("initializing with elements in alpha < 0 fails") {
+ intercept[IllegalArgumentException] {
+ val lda = new LDA().setK(4).setAlpha(Vectors.dense(-1, 2, 3, 4))
+ val corpus = sc.parallelize(tinyCorpus, 2)
+ lda.run(corpus)
+ }
+ }
+
test("OnlineLDAOptimizer initialization") {
val lda = new LDA().setK(2)
val corpus = sc.parallelize(tinyCorpus, 2)
val op = new OnlineLDAOptimizer().initialize(corpus, lda)
op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau0(567)
- assert(op.getAlpha == 0.5) // default 1.0 / k
- assert(op.getEta == 0.5) // default 1.0 / k
- assert(op.getKappa == 0.9876)
- assert(op.getMiniBatchFraction == 0.123)
- assert(op.getTau0 == 567)
+ assert(op.getAlpha.toArray.forall(_ === 0.5)) // default 1.0 / k
+ assert(op.getEta === 0.5) // default 1.0 / k
+ assert(op.getKappa === 0.9876)
+ assert(op.getMiniBatchFraction === 0.123)
+ assert(op.getTau0 === 567)
}
test("OnlineLDAOptimizer one iteration") {
@@ -218,6 +234,56 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}
}
+ test("OnlineLDAOptimizer with asymmetric prior") {
+ def toydata: Array[(Long, Vector)] = Array(
+ Vectors.sparse(6, Array(0, 1), Array(1, 1)),
+ Vectors.sparse(6, Array(1, 2), Array(1, 1)),
+ Vectors.sparse(6, Array(0, 2), Array(1, 1)),
+ Vectors.sparse(6, Array(3, 4), Array(1, 1)),
+ Vectors.sparse(6, Array(3, 5), Array(1, 1)),
+ Vectors.sparse(6, Array(4, 5), Array(1, 1))
+ ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
+
+ val docs = sc.parallelize(toydata)
+ val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51)
+ .setGammaShape(1e10)
+ val lda = new LDA().setK(2)
+ .setDocConcentration(Vectors.dense(0.00001, 0.1))
+ .setTopicConcentration(0.01)
+ .setMaxIterations(100)
+ .setOptimizer(op)
+ .setSeed(12345)
+
+ val ldaModel = lda.run(docs)
+ val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
+ val topics = topicIndices.map { case (terms, termWeights) =>
+ terms.zip(termWeights)
+ }
+
+ /* Verify results with Python:
+
+ import numpy as np
+ from gensim import models
+ corpus = [
+ [(0, 1.0), (1, 1.0)],
+ [(1, 1.0), (2, 1.0)],
+ [(0, 1.0), (2, 1.0)],
+ [(3, 1.0), (4, 1.0)],
+ [(3, 1.0), (5, 1.0)],
+ [(4, 1.0), (5, 1.0)]]
+ np.random.seed(10)
+ lda = models.ldamodel.LdaModel(
+ corpus=corpus, alpha=np.array([0.00001, 0.1]), num_topics=2, update_every=0, passes=100)
+ lda.print_topics()
+
+ > ['0.167*0 + 0.167*1 + 0.167*2 + 0.167*3 + 0.167*4 + 0.167*5',
+ '0.167*0 + 0.167*1 + 0.167*2 + 0.167*4 + 0.167*3 + 0.167*5']
+ */
+ topics.foreach { topic =>
+ assert(topic.forall { case (_, p) => p ~= 0.167 absTol 0.05 })
+ }
+ }
+
test("model save/load") {
// Test for LocalLDAModel.
val localModel = new LocalLDAModel(tinyTopics)