diff options
Diffstat (limited to 'mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala | 135 |
1 files changed, 107 insertions, 28 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index fe6a37fd6d..c57ceba4a9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -17,21 +17,22 @@ package org.apache.spark.ml.clustering -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} import org.apache.spark.ml.util._ import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, - EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, - LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, - OnlineLDAOptimizer => OldOnlineLDAOptimizer} + EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, + LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, + OnlineLDAOptimizer => OldOnlineLDAOptimizer} +import org.apache.spark.mllib.impl.PeriodicCheckpointer import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors, VectorUDT} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext} import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf} import org.apache.spark.sql.types.StructType @@ -41,6 +42,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM /** * Param for the number of topics (clusters) to infer. Must be > 1. Default: 10. + * * @group param */ @Since("1.6.0") @@ -173,10 +175,11 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * This uses a variational approximation following Hoffman et al. (2010), where the approximate * distribution is called "gamma." Technically, this method returns this approximation "gamma" * for each document. + * * @group param */ @Since("1.6.0") - final val topicDistributionCol = new Param[String](this, "topicDistribution", "Output column" + + final val topicDistributionCol = new Param[String](this, "topicDistributionCol", "Output column" + " with estimates of the topic mixture distribution for each document (often called \"theta\"" + " in the literature). Returns a vector of zeros for an empty document.") @@ -187,15 +190,19 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getTopicDistributionCol: String = $(topicDistributionCol) /** + * For Online optimizer only: [[optimizer]] = "online". + * * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. * This is called "tau0" in the Online LDA paper (Hoffman et al., 2010) * Default: 1024, following Hoffman et al. + * * @group expertParam */ @Since("1.6.0") - final val learningOffset = new DoubleParam(this, "learningOffset", "A (positive) learning" + - " parameter that downweights early iterations. Larger values make early iterations count less.", + final val learningOffset = new DoubleParam(this, "learningOffset", "(For online optimizer)" + + " A (positive) learning parameter that downweights early iterations. Larger values make early" + + " iterations count less.", ParamValidators.gt(0)) /** @group expertGetParam */ @@ -203,22 +210,27 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getLearningOffset: Double = $(learningOffset) /** + * For Online optimizer only: [[optimizer]] = "online". + * * Learning rate, set as an exponential decay rate. * This should be between (0.5, 1.0] to guarantee asymptotic convergence. * This is called "kappa" in the Online LDA paper (Hoffman et al., 2010). * Default: 0.51, based on Hoffman et al. + * * @group expertParam */ @Since("1.6.0") - final val learningDecay = new DoubleParam(this, "learningDecay", "Learning rate, set as an" + - " exponential decay rate. This should be between (0.5, 1.0] to guarantee asymptotic" + - " convergence.", ParamValidators.gt(0)) + final val learningDecay = new DoubleParam(this, "learningDecay", "(For online optimizer)" + + " Learning rate, set as an exponential decay rate. This should be between (0.5, 1.0] to" + + " guarantee asymptotic convergence.", ParamValidators.gt(0)) /** @group expertGetParam */ @Since("1.6.0") def getLearningDecay: Double = $(learningDecay) /** + * For Online optimizer only: [[optimizer]] = "online". + * * Fraction of the corpus to be sampled and used in each iteration of mini-batch gradient descent, * in range (0, 1]. * @@ -230,11 +242,13 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * [[org.apache.spark.mllib.clustering.OnlineLDAOptimizer]]. * * Default: 0.05, i.e., 5% of total documents. + * * @group param */ @Since("1.6.0") - final val subsamplingRate = new DoubleParam(this, "subsamplingRate", "Fraction of the corpus" + - " to be sampled and used in each iteration of mini-batch gradient descent, in range (0, 1].", + final val subsamplingRate = new DoubleParam(this, "subsamplingRate", "(For online optimizer)" + + " Fraction of the corpus to be sampled and used in each iteration of mini-batch" + + " gradient descent, in range (0, 1].", ParamValidators.inRange(0.0, 1.0, lowerInclusive = false, upperInclusive = true)) /** @group getParam */ @@ -242,23 +256,52 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM def getSubsamplingRate: Double = $(subsamplingRate) /** + * For Online optimizer only (currently): [[optimizer]] = "online". + * * Indicates whether the docConcentration (Dirichlet parameter for * document-topic distribution) will be optimized during training. * Setting this to true will make the model more expressive and fit the training data better. * Default: false + * * @group expertParam */ @Since("1.6.0") final val optimizeDocConcentration = new BooleanParam(this, "optimizeDocConcentration", - "Indicates whether the docConcentration (Dirichlet parameter for document-topic" + - " distribution) will be optimized during training.") + "(For online optimizer only, currently) Indicates whether the docConcentration" + + " (Dirichlet parameter for document-topic distribution) will be optimized during training.") /** @group expertGetParam */ @Since("1.6.0") def getOptimizeDocConcentration: Boolean = $(optimizeDocConcentration) /** + * For EM optimizer only: [[optimizer]] = "em". + * + * If using checkpointing, this indicates whether to keep the last + * checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can + * cause failures if a data partition is lost, so set this bit with care. + * Note that checkpoints will be cleaned up via reference counting, regardless. + * + * See [[DistributedLDAModel.getCheckpointFiles]] for getting remaining checkpoints and + * [[DistributedLDAModel.deleteCheckpointFiles]] for removing remaining checkpoints. + * + * Default: true + * + * @group expertParam + */ + @Since("2.0.0") + final val keepLastCheckpoint = new BooleanParam(this, "keepLastCheckpoint", + "(For EM optimizer) If using checkpointing, this indicates whether to keep the last" + + " checkpoint. If false, then the checkpoint will be deleted. Deleting the checkpoint can" + + " cause failures if a data partition is lost, so set this bit with care.") + + /** @group expertGetParam */ + @Since("2.0.0") + def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint) + + /** * Validates and transforms the input schema. + * * @param schema input schema * @return output schema */ @@ -303,6 +346,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM .setOptimizeDocConcentration($(optimizeDocConcentration)) case "em" => new OldEMLDAOptimizer() + .setKeepLastCheckpoint($(keepLastCheckpoint)) } } @@ -341,6 +385,7 @@ sealed abstract class LDAModel private[ml] ( /** * The features for LDA should be a [[Vector]] representing the word counts in a document. * The vector should be of length vocabSize, with counts for each term (word). + * * @group setParam */ @Since("1.6.0") @@ -357,15 +402,15 @@ sealed abstract class LDAModel private[ml] ( * is set to "em"), this involves collecting a large [[topicsMatrix]] to the driver. * This implementation may be changed in the future. */ - @Since("1.6.0") - override def transform(dataset: DataFrame): DataFrame = { + @Since("2.0.0") + override def transform(dataset: Dataset[_]): DataFrame = { if ($(topicDistributionCol).nonEmpty) { val t = udf(oldLocalModel.getTopicDistributionMethod(sqlContext.sparkContext)) - dataset.withColumn($(topicDistributionCol), t(col($(featuresCol)))) + dataset.withColumn($(topicDistributionCol), t(col($(featuresCol)))).toDF } else { logWarning("LDAModel.transform was called without any output columns. Set an output column" + " such as topicDistributionCol to produce results.") - dataset + dataset.toDF } } @@ -410,8 +455,8 @@ sealed abstract class LDAModel private[ml] ( * @param dataset test corpus to use for calculating log likelihood * @return variational lower bound on the log likelihood of the entire corpus */ - @Since("1.6.0") - def logLikelihood(dataset: DataFrame): Double = { + @Since("2.0.0") + def logLikelihood(dataset: Dataset[_]): Double = { val oldDataset = LDA.getOldDataset(dataset, $(featuresCol)) oldLocalModel.logLikelihood(oldDataset) } @@ -427,8 +472,8 @@ sealed abstract class LDAModel private[ml] ( * @param dataset test corpus to use for calculating perplexity * @return Variational upper bound on log perplexity per token. */ - @Since("1.6.0") - def logPerplexity(dataset: DataFrame): Double = { + @Since("2.0.0") + def logPerplexity(dataset: Dataset[_]): Double = { val oldDataset = LDA.getOldDataset(dataset, $(featuresCol)) oldLocalModel.logPerplexity(oldDataset) } @@ -619,6 +664,35 @@ class DistributedLDAModel private[ml] ( @Since("1.6.0") lazy val logPrior: Double = oldDistributedModel.logPrior + private var _checkpointFiles: Array[String] = oldDistributedModel.checkpointFiles + + /** + * If using checkpointing and [[LDA.keepLastCheckpoint]] is set to true, then there may be + * saved checkpoint files. This method is provided so that users can manage those files. + * + * Note that removing the checkpoints can cause failures if a partition is lost and is needed + * by certain [[DistributedLDAModel]] methods. Reference counting will clean up the checkpoints + * when this model and derivative data go out of scope. + * + * @return Checkpoint files from training + */ + @DeveloperApi + @Since("2.0.0") + def getCheckpointFiles: Array[String] = _checkpointFiles + + /** + * Remove any remaining checkpoint files from training. + * + * @see [[getCheckpointFiles]] + */ + @DeveloperApi + @Since("2.0.0") + def deleteCheckpointFiles(): Unit = { + val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration) + _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs)) + _checkpointFiles = Array.empty[String] + } + @Since("1.6.0") override def write: MLWriter = new DistributedLDAModel.DistributedWriter(this) } @@ -696,11 +770,12 @@ class LDA @Since("1.6.0") ( setDefault(maxIter -> 20, k -> 10, optimizer -> "online", checkpointInterval -> 10, learningOffset -> 1024, learningDecay -> 0.51, subsamplingRate -> 0.05, - optimizeDocConcentration -> true) + optimizeDocConcentration -> true, keepLastCheckpoint -> true) /** * The features for LDA should be a [[Vector]] representing the word counts in a document. * The vector should be of length vocabSize, with counts for each term (word). + * * @group setParam */ @Since("1.6.0") @@ -758,11 +833,15 @@ class LDA @Since("1.6.0") ( @Since("1.6.0") def setOptimizeDocConcentration(value: Boolean): this.type = set(optimizeDocConcentration, value) + /** @group expertSetParam */ + @Since("2.0.0") + def setKeepLastCheckpoint(value: Boolean): this.type = set(keepLastCheckpoint, value) + @Since("1.6.0") override def copy(extra: ParamMap): LDA = defaultCopy(extra) - @Since("1.6.0") - override def fit(dataset: DataFrame): LDAModel = { + @Since("2.0.0") + override def fit(dataset: Dataset[_]): LDAModel = { transformSchema(dataset.schema, logging = true) val oldLDA = new OldLDA() .setK($(k)) @@ -794,7 +873,7 @@ class LDA @Since("1.6.0") ( private[clustering] object LDA extends DefaultParamsReadable[LDA] { /** Get dataset for spark.mllib LDA */ - def getOldDataset(dataset: DataFrame, featuresCol: String): RDD[(Long, Vector)] = { + def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, Vector)] = { dataset .withColumn("docId", monotonicallyIncreasingId()) .select("docId", featuresCol) |