path: root/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
diff options
Diffstat (limited to 'mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala')
1 files changed, 107 insertions, 28 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index fe6a37fd6d..c57ceba4a9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -17,21 +17,22 @@
package org.apache.spark.ml.clustering
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel,
- EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel,
- LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
- OnlineLDAOptimizer => OldOnlineLDAOptimizer}
+ EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel,
+ LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
+ OnlineLDAOptimizer => OldOnlineLDAOptimizer}
+import org.apache.spark.mllib.impl.PeriodicCheckpointer
import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors, VectorUDT}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf}
import org.apache.spark.sql.types.StructType
@@ -41,6 +42,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
* Param for the number of topics (clusters) to infer. Must be > 1. Default: 10.
+ *
* @group param
@@ -173,10 +175,11 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
* This uses a variational approximation following Hoffman et al. (2010), where the approximate
* distribution is called "gamma." Technically, this method returns this approximation "gamma"
* for each document.
+ *
* @group param
- final val topicDistributionCol = new Param[String](this, "topicDistribution", "Output column" +
+ final val topicDistributionCol = new Param[String](this, "topicDistributionCol", "Output column" +
" with estimates of the topic mixture distribution for each document (often called \"theta\"" +
" in the literature). Returns a vector of zeros for an empty document.")
@@ -187,15 +190,19 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
def getTopicDistributionCol: String = $(topicDistributionCol)
+ * For Online optimizer only: [[optimizer]] = "online".
+ *
* A (positive) learning parameter that downweights early iterations. Larger values make early
* iterations count less.
* This is called "tau0" in the Online LDA paper (Hoffman et al., 2010)
* Default: 1024, following Hoffman et al.
+ *
* @group expertParam
- final val learningOffset = new DoubleParam(this, "learningOffset", "A (positive) learning" +
- " parameter that downweights early iterations. Larger values make early iterations count less.",
+ final val learningOffset = new DoubleParam(this, "learningOffset", "(For online optimizer)" +
+ " A (positive) learning parameter that downweights early iterations. Larger values make early" +
+ " iterations count less.",
/** @group expertGetParam */
@@ -203,22 +210,27 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
def getLearningOffset: Double = $(learningOffset)
+ * For Online optimizer only: [[optimizer]] = "online".
+ *
* Learning rate, set as an exponential decay rate.
* This should be between (0.5, 1.0] to guarantee asymptotic convergence.
* This is called "kappa" in the Online LDA paper (Hoffman et al., 2010).
* Default: 0.51, based on Hoffman et al.
+ *
* @group expertParam
- final val learningDecay = new DoubleParam(this, "learningDecay", "Learning rate, set as an" +
- " exponential decay rate. This should be between (0.5, 1.0] to guarantee asymptotic" +
- " convergence.", ParamValidators.gt(0))
+ final val learningDecay = new DoubleParam(this, "learningDecay", "(For online optimizer)" +
+ " Learning rate, set as an exponential decay rate. This should be between (0.5, 1.0] to" +
+ " guarantee asymptotic convergence.", ParamValidators.gt(0))
/** @group expertGetParam */
def getLearningDecay: Double = $(learningDecay)
+ * For Online optimizer only: [[optimizer]] = "online".
+ *
* Fraction of the corpus to be sampled and used in each iteration of mini-batch gradient descent,
* in range (0, 1].
@@ -230,11 +242,13 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
* [[org.apache.spark.mllib.clustering.OnlineLDAOptimizer]].
* Default: 0.05, i.e., 5% of total documents.
+ *
* @group param
- final val subsamplingRate = new DoubleParam(this, "subsamplingRate", "Fraction of the corpus" +
- " to be sampled and used in each iteration of mini-batch gradient descent, in range (0, 1].",
+ final val subsamplingRate = new DoubleParam(this, "subsamplingRate", "(For online optimizer)" +
+ " Fraction of the corpus to be sampled and used in each iteration of mini-batch" +
+ " gradient descent, in range (0, 1].",
ParamValidators.inRange(0.0, 1.0, lowerInclusive = false, upperInclusive = true))
/** @group getParam */
@@ -242,23 +256,52 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
def getSubsamplingRate: Double = $(subsamplingRate)
+ * For Online optimizer only (currently): [[optimizer]] = "online".
+ *
* Indicates whether the docConcentration (Dirichlet parameter for
* document-topic distribution) will be optimized during training.
* Setting this to true will make the model more expressive and fit the training data better.
* Default: false
+ *
* @group expertParam
final val optimizeDocConcentration = new BooleanParam(this, "optimizeDocConcentration",
- "Indicates whether the docConcentration (Dirichlet parameter for document-topic" +
- " distribution) will be optimized during training.")
+ "(For online optimizer only, currently) Indicates whether the docConcentration" +
+ " (Dirichlet parameter for document-topic distribution) will be optimized during training.")
/** @group expertGetParam */
def getOptimizeDocConcentration: Boolean = $(optimizeDocConcentration)
+ * For EM optimizer only: [[optimizer]] = "em".
+ *
+ * If using checkpointing, this indicates whether to keep the last
+ * checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can
+ * cause failures if a data partition is lost, so set this bit with care.
+ * Note that checkpoints will be cleaned up via reference counting, regardless.
+ *
+ * See [[DistributedLDAModel.getCheckpointFiles]] for getting remaining checkpoints and
+ * [[DistributedLDAModel.deleteCheckpointFiles]] for removing remaining checkpoints.
+ *
+ * Default: true
+ *
+ * @group expertParam
+ */
+ @Since("2.0.0")
+ final val keepLastCheckpoint = new BooleanParam(this, "keepLastCheckpoint",
+ "(For EM optimizer) If using checkpointing, this indicates whether to keep the last" +
+ " checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can" +
+ " cause failures if a data partition is lost, so set this bit with care.")
+ /** @group expertGetParam */
+ @Since("2.0.0")
+ def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint)
+ /**
* Validates and transforms the input schema.
+ *
* @param schema input schema
* @return output schema
@@ -303,6 +346,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM
case "em" =>
new OldEMLDAOptimizer()
+ .setKeepLastCheckpoint($(keepLastCheckpoint))
@@ -341,6 +385,7 @@ sealed abstract class LDAModel private[ml] (
* The features for LDA should be a [[Vector]] representing the word counts in a document.
* The vector should be of length vocabSize, with counts for each term (word).
+ *
* @group setParam
@@ -357,15 +402,15 @@ sealed abstract class LDAModel private[ml] (
* is set to "em"), this involves collecting a large [[topicsMatrix]] to the driver.
* This implementation may be changed in the future.
- @Since("1.6.0")
- override def transform(dataset: DataFrame): DataFrame = {
+ @Since("2.0.0")
+ override def transform(dataset: Dataset[_]): DataFrame = {
if ($(topicDistributionCol).nonEmpty) {
val t = udf(oldLocalModel.getTopicDistributionMethod(sqlContext.sparkContext))
- dataset.withColumn($(topicDistributionCol), t(col($(featuresCol))))
+ dataset.withColumn($(topicDistributionCol), t(col($(featuresCol)))).toDF
} else {
logWarning("LDAModel.transform was called without any output columns. Set an output column" +
" such as topicDistributionCol to produce results.")
- dataset
+ dataset.toDF
@@ -410,8 +455,8 @@ sealed abstract class LDAModel private[ml] (
* @param dataset test corpus to use for calculating log likelihood
* @return variational lower bound on the log likelihood of the entire corpus
- @Since("1.6.0")
- def logLikelihood(dataset: DataFrame): Double = {
+ @Since("2.0.0")
+ def logLikelihood(dataset: Dataset[_]): Double = {
val oldDataset = LDA.getOldDataset(dataset, $(featuresCol))
@@ -427,8 +472,8 @@ sealed abstract class LDAModel private[ml] (
* @param dataset test corpus to use for calculating perplexity
* @return Variational upper bound on log perplexity per token.
- @Since("1.6.0")
- def logPerplexity(dataset: DataFrame): Double = {
+ @Since("2.0.0")
+ def logPerplexity(dataset: Dataset[_]): Double = {
val oldDataset = LDA.getOldDataset(dataset, $(featuresCol))
@@ -619,6 +664,35 @@ class DistributedLDAModel private[ml] (
lazy val logPrior: Double = oldDistributedModel.logPrior
+ private var _checkpointFiles: Array[String] = oldDistributedModel.checkpointFiles
+ /**
+ * If using checkpointing and [[LDA.keepLastCheckpoint]] is set to true, then there may be
+ * saved checkpoint files. This method is provided so that users can manage those files.
+ *
+ * Note that removing the checkpoints can cause failures if a partition is lost and is needed
+ * by certain [[DistributedLDAModel]] methods. Reference counting will clean up the checkpoints
+ * when this model and derivative data go out of scope.
+ *
+ * @return Checkpoint files from training
+ */
+ @DeveloperApi
+ @Since("2.0.0")
+ def getCheckpointFiles: Array[String] = _checkpointFiles
+ /**
+ * Remove any remaining checkpoint files from training.
+ *
+ * @see [[getCheckpointFiles]]
+ */
+ @DeveloperApi
+ @Since("2.0.0")
+ def deleteCheckpointFiles(): Unit = {
+ val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
+ _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
+ _checkpointFiles = Array.empty[String]
+ }
override def write: MLWriter = new DistributedLDAModel.DistributedWriter(this)
@@ -696,11 +770,12 @@ class LDA @Since("1.6.0") (
setDefault(maxIter -> 20, k -> 10, optimizer -> "online", checkpointInterval -> 10,
learningOffset -> 1024, learningDecay -> 0.51, subsamplingRate -> 0.05,
- optimizeDocConcentration -> true)
+ optimizeDocConcentration -> true, keepLastCheckpoint -> true)
* The features for LDA should be a [[Vector]] representing the word counts in a document.
* The vector should be of length vocabSize, with counts for each term (word).
+ *
* @group setParam
@@ -758,11 +833,15 @@ class LDA @Since("1.6.0") (
def setOptimizeDocConcentration(value: Boolean): this.type = set(optimizeDocConcentration, value)
+ /** @group expertSetParam */
+ @Since("2.0.0")
+ def setKeepLastCheckpoint(value: Boolean): this.type = set(keepLastCheckpoint, value)
override def copy(extra: ParamMap): LDA = defaultCopy(extra)
- @Since("1.6.0")
- override def fit(dataset: DataFrame): LDAModel = {
+ @Since("2.0.0")
+ override def fit(dataset: Dataset[_]): LDAModel = {
transformSchema(dataset.schema, logging = true)
val oldLDA = new OldLDA()
@@ -794,7 +873,7 @@ class LDA @Since("1.6.0") (
private[clustering] object LDA extends DefaultParamsReadable[LDA] {
/** Get dataset for spark.mllib LDA */
- def getOldDataset(dataset: DataFrame, featuresCol: String): RDD[(Long, Vector)] = {
+ def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, Vector)] = {
.withColumn("docId", monotonicallyIncreasingId())
.select("docId", featuresCol)