diff options
author | Xiangrui Meng <meng@databricks.com> | 2015-08-19 13:17:26 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-08-19 13:17:26 -0700 |
commit | 5b62bef8cbf73f910513ef3b1f557aa94b384854 (patch) | |
tree | 1f49cffb09ed5bac692b1f796ee6b18b21a33600 /mllib | |
parent | d898c33f774b9a3db2fb6aa8f0cb2c2ac6004b58 (diff) | |
download | spark-5b62bef8cbf73f910513ef3b1f557aa94b384854.tar.gz spark-5b62bef8cbf73f910513ef3b1f557aa94b384854.tar.bz2 spark-5b62bef8cbf73f910513ef3b1f557aa94b384854.zip |
[SPARK-8918] [MLLIB] [DOC] Add @since tags to mllib.clustering
This continues the work from #8256. I removed `since` tags from private/protected/local methods/variables (see https://github.com/apache/spark/commit/72fdeb64630470f6f46cf3eed8ffbfe83a7c4659). MechCoder
Closes #8256
Author: Xiangrui Meng <meng@databricks.com>
Author: Xiaoqing Wang <spark445@126.com>
Author: MechCoder <manojkumarsivaraj334@gmail.com>
Closes #8288 from mengxr/SPARK-8918.
Diffstat (limited to 'mllib')
9 files changed, 338 insertions, 52 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index e459367333..bc27b1fe73 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -62,6 +62,7 @@ class GaussianMixture private ( /** * Constructs a default instance. The default parameters are {k: 2, convergenceTol: 0.01, * maxIterations: 100, seed: random}. + * @since 1.3.0 */ def this() = this(2, 0.01, 100, Utils.random.nextLong()) @@ -72,9 +73,11 @@ class GaussianMixture private ( // default random starting point private var initialModel: Option[GaussianMixtureModel] = None - /** Set the initial GMM starting point, bypassing the random initialization. - * You must call setK() prior to calling this method, and the condition - * (model.k == this.k) must be met; failure will result in an IllegalArgumentException + /** + * Set the initial GMM starting point, bypassing the random initialization. + * You must call setK() prior to calling this method, and the condition + * (model.k == this.k) must be met; failure will result in an IllegalArgumentException + * @since 1.3.0 */ def setInitialModel(model: GaussianMixtureModel): this.type = { if (model.k == k) { @@ -85,30 +88,46 @@ class GaussianMixture private ( this } - /** Return the user supplied initial GMM, if supplied */ + /** + * Return the user supplied initial GMM, if supplied + * @since 1.3.0 + */ def getInitialModel: Option[GaussianMixtureModel] = initialModel - /** Set the number of Gaussians in the mixture model. Default: 2 */ + /** + * Set the number of Gaussians in the mixture model. Default: 2 + * @since 1.3.0 + */ def setK(k: Int): this.type = { this.k = k this } - /** Return the number of Gaussians in the mixture model */ + /** + * Return the number of Gaussians in the mixture model + * @since 1.3.0 + */ def getK: Int = k - /** Set the maximum number of iterations to run. Default: 100 */ + /** + * Set the maximum number of iterations to run. Default: 100 + * @since 1.3.0 + */ def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations this } - /** Return the maximum number of iterations to run */ + /** + * Return the maximum number of iterations to run + * @since 1.3.0 + */ def getMaxIterations: Int = maxIterations /** * Set the largest change in log-likelihood at which convergence is * considered to have occurred. + * @since 1.3.0 */ def setConvergenceTol(convergenceTol: Double): this.type = { this.convergenceTol = convergenceTol @@ -118,19 +137,29 @@ class GaussianMixture private ( /** * Return the largest change in log-likelihood at which convergence is * considered to have occurred. + * @since 1.3.0 */ def getConvergenceTol: Double = convergenceTol - /** Set the random seed */ + /** + * Set the random seed + * @since 1.3.0 + */ def setSeed(seed: Long): this.type = { this.seed = seed this } - /** Return the random seed */ + /** + * Return the random seed + * @since 1.3.0 + */ def getSeed: Long = seed - /** Perform expectation maximization */ + /** + * Perform expectation maximization + * @since 1.3.0 + */ def run(data: RDD[Vector]): GaussianMixtureModel = { val sc = data.sparkContext @@ -204,7 +233,10 @@ class GaussianMixture private ( new GaussianMixtureModel(weights, gaussians) } - /** Java-friendly version of [[run()]] */ + /** + * Java-friendly version of [[run()]] + * @since 1.3.0 + */ def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd) private def updateWeightsAndGaussians( diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 76aeebd703..2fa0473737 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.{SQLContext, Row} * the weight for Gaussian i, and weights.sum == 1 * @param gaussians Array of MultivariateGaussian where gaussians(i) represents * the Multivariate Gaussian (Normal) Distribution for Gaussian i + * @since 1.3.0 */ @Experimental class GaussianMixtureModel( @@ -53,32 +54,48 @@ class GaussianMixtureModel( override protected def formatVersion = "1.0" + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { GaussianMixtureModel.SaveLoadV1_0.save(sc, path, weights, gaussians) } - /** Number of gaussians in mixture */ + /** + * Number of gaussians in mixture + * @since 1.3.0 + */ def k: Int = weights.length - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.3.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val responsibilityMatrix = predictSoft(points) responsibilityMatrix.map(r => r.indexOf(r.max)) } - /** Maps given point to its cluster index. */ + /** + * Maps given point to its cluster index. + * @since 1.5.0 + */ def predict(point: Vector): Int = { val r = computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) r.indexOf(r.max) } - /** Java-friendly version of [[predict()]] */ + /** + * Java-friendly version of [[predict()]] + * @since 1.4.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** * Given the input vectors, return the membership value of each vector * to all mixture components. + * @since 1.3.0 */ def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = { val sc = points.sparkContext @@ -91,6 +108,7 @@ class GaussianMixtureModel( /** * Given the input vector, return the membership values to all mixture components. + * @since 1.4.0 */ def predictSoft(point: Vector): Array[Double] = { computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) @@ -115,6 +133,9 @@ class GaussianMixtureModel( } } +/** + * @since 1.4.0 + */ @Experimental object GaussianMixtureModel extends Loader[GaussianMixtureModel] { @@ -165,6 +186,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { } } + /** + * @since 1.4.0 + */ override def load(sc: SparkContext, path: String) : GaussianMixtureModel = { val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) implicit val formats = DefaultFormats diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0a65403f4e..9ef6834e5e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -49,15 +49,20 @@ class KMeans private ( /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}. + * @since 0.8.0 */ def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong()) /** * Number of clusters to create (k). + * @since 1.4.0 */ def getK: Int = k - /** Set the number of clusters to create (k). Default: 2. */ + /** + * Set the number of clusters to create (k). Default: 2. + * @since 0.8.0 + */ def setK(k: Int): this.type = { this.k = k this @@ -65,10 +70,14 @@ class KMeans private ( /** * Maximum number of iterations to run. + * @since 1.4.0 */ def getMaxIterations: Int = maxIterations - /** Set maximum number of iterations to run. Default: 20. */ + /** + * Set maximum number of iterations to run. Default: 20. + * @since 0.8.0 + */ def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations this @@ -76,6 +85,7 @@ class KMeans private ( /** * The initialization algorithm. This can be either "random" or "k-means||". + * @since 1.4.0 */ def getInitializationMode: String = initializationMode @@ -83,6 +93,7 @@ class KMeans private ( * Set the initialization algorithm. This can be either "random" to choose random points as * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. + * @since 0.8.0 */ def setInitializationMode(initializationMode: String): this.type = { KMeans.validateInitMode(initializationMode) @@ -93,6 +104,7 @@ class KMeans private ( /** * :: Experimental :: * Number of runs of the algorithm to execute in parallel. + * @since 1.4.0 */ @Experimental def getRuns: Int = runs @@ -102,6 +114,7 @@ class KMeans private ( * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm * this many times with random starting conditions (configured by the initialization mode), then * return the best clustering found over any run. Default: 1. + * @since 0.8.0 */ @Experimental def setRuns(runs: Int): this.type = { @@ -114,12 +127,14 @@ class KMeans private ( /** * Number of steps for the k-means|| initialization mode + * @since 1.4.0 */ def getInitializationSteps: Int = initializationSteps /** * Set the number of steps for the k-means|| initialization mode. This is an advanced * setting -- the default of 5 is almost always enough. Default: 5. + * @since 0.8.0 */ def setInitializationSteps(initializationSteps: Int): this.type = { if (initializationSteps <= 0) { @@ -131,12 +146,14 @@ class KMeans private ( /** * The distance threshold within which we've consider centers to have converged. + * @since 1.4.0 */ def getEpsilon: Double = epsilon /** * Set the distance threshold within which we've consider centers to have converged. * If all centers move less than this Euclidean distance, we stop iterating one run. + * @since 0.8.0 */ def setEpsilon(epsilon: Double): this.type = { this.epsilon = epsilon @@ -145,10 +162,14 @@ class KMeans private ( /** * The random seed for cluster initialization. + * @since 1.4.0 */ def getSeed: Long = seed - /** Set the random seed for cluster initialization. */ + /** + * Set the random seed for cluster initialization. + * @since 1.4.0 + */ def setSeed(seed: Long): this.type = { this.seed = seed this @@ -162,6 +183,7 @@ class KMeans private ( * Set the initial starting point, bypassing the random initialization or k-means|| * The condition model.k == this.k must be met, failure results * in an IllegalArgumentException. + * @since 1.4.0 */ def setInitialModel(model: KMeansModel): this.type = { require(model.k == k, "mismatched cluster count") @@ -172,6 +194,7 @@ class KMeans private ( /** * Train a K-means model on the given set of points; `data` should be cached for high * performance, because this is an iterative algorithm. + * @since 0.8.0 */ def run(data: RDD[Vector]): KMeansModel = { @@ -430,11 +453,14 @@ class KMeans private ( /** * Top-level methods for calling K-means clustering. + * @since 0.8.0 */ object KMeans { // Initialization mode names + /** @since 0.8.0 */ val RANDOM = "random" + /** @since 0.8.0 */ val K_MEANS_PARALLEL = "k-means||" /** @@ -446,6 +472,7 @@ object KMeans { * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). * @param seed random seed value for cluster initialization + * @since 1.3.0 */ def train( data: RDD[Vector], @@ -470,6 +497,7 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @since 0.8.0 */ def train( data: RDD[Vector], @@ -486,6 +514,7 @@ object KMeans { /** * Trains a k-means model using specified parameters and the default values for unspecified. + * @since 0.8.0 */ def train( data: RDD[Vector], @@ -496,6 +525,7 @@ object KMeans { /** * Trains a k-means model using specified parameters and the default values for unspecified. + * @since 0.8.0 */ def train( data: RDD[Vector], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 96359024fa..8de2087ceb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -34,35 +34,52 @@ import org.apache.spark.sql.Row /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. + * @since 0.8.0 */ class KMeansModel ( val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable { - /** A Java-friendly constructor that takes an Iterable of Vectors. */ + /** + * A Java-friendly constructor that takes an Iterable of Vectors. + * @since 1.4.0 + */ def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray) - /** Total number of clusters. */ + /** + * Total number of clusters. + * @since 0.8.0 + */ def k: Int = clusterCenters.length - /** Returns the cluster index that a given point belongs to. */ + /** + * Returns the cluster index that a given point belongs to. + * @since 0.8.0 + */ def predict(point: Vector): Int = { KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1 } - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.0.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val centersWithNorm = clusterCentersWithNorm val bcCentersWithNorm = points.context.broadcast(centersWithNorm) points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1) } - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.0.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. + * @since 0.8.0 */ def computeCost(data: RDD[Vector]): Double = { val centersWithNorm = clusterCentersWithNorm @@ -73,6 +90,9 @@ class KMeansModel ( private def clusterCentersWithNorm: Iterable[VectorWithNorm] = clusterCenters.map(new VectorWithNorm(_)) + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { KMeansModel.SaveLoadV1_0.save(sc, this, path) } @@ -80,7 +100,14 @@ class KMeansModel ( override protected def formatVersion: String = "1.0" } +/** + * @since 1.4.0 + */ object KMeansModel extends Loader[KMeansModel] { + + /** + * @since 1.4.0 + */ override def load(sc: SparkContext, path: String): KMeansModel = { KMeansModel.SaveLoadV1_0.load(sc, path) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 0fc9b1ac4d..2a8c6acbae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -43,6 +43,7 @@ import org.apache.spark.util.Utils * * @see [[http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation Latent Dirichlet allocation * (Wikipedia)]] + * @since 1.3.0 */ @Experimental class LDA private ( @@ -54,18 +55,25 @@ class LDA private ( private var checkpointInterval: Int, private var ldaOptimizer: LDAOptimizer) extends Logging { + /** + * Constructs a LDA instance with default parameters. + * @since 1.3.0 + */ def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1), topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer) /** * Number of topics to infer. I.e., the number of soft cluster centers. + * + * @since 1.3.0 */ def getK: Int = k /** * Number of topics to infer. I.e., the number of soft cluster centers. * (default = 10) + * @since 1.3.0 */ def setK(k: Int): this.type = { require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k") @@ -78,6 +86,7 @@ class LDA private ( * distributions over topics ("theta"). * * This is the parameter to a Dirichlet distribution. + * @since 1.5.0 */ def getAsymmetricDocConcentration: Vector = this.docConcentration @@ -87,6 +96,7 @@ class LDA private ( * * This method assumes the Dirichlet distribution is symmetric and can be described by a single * [[Double]] parameter. It should fail if docConcentration is asymmetric. + * @since 1.3.0 */ def getDocConcentration: Double = { val parameter = docConcentration(0) @@ -121,6 +131,7 @@ class LDA private ( * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * @since 1.5.0 */ def setDocConcentration(docConcentration: Vector): this.type = { require(docConcentration.size > 0, "docConcentration must have > 0 elements") @@ -128,22 +139,37 @@ class LDA private ( this } - /** Replicates a [[Double]] docConcentration to create a symmetric prior. */ + /** + * Replicates a [[Double]] docConcentration to create a symmetric prior. + * @since 1.3.0 + */ def setDocConcentration(docConcentration: Double): this.type = { this.docConcentration = Vectors.dense(docConcentration) this } - /** Alias for [[getAsymmetricDocConcentration]] */ + /** + * Alias for [[getAsymmetricDocConcentration]] + * @since 1.5.0 + */ def getAsymmetricAlpha: Vector = getAsymmetricDocConcentration - /** Alias for [[getDocConcentration]] */ + /** + * Alias for [[getDocConcentration]] + * @since 1.3.0 + */ def getAlpha: Double = getDocConcentration - /** Alias for [[setDocConcentration()]] */ + /** + * Alias for [[setDocConcentration()]] + * @since 1.5.0 + */ def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha) - /** Alias for [[setDocConcentration()]] */ + /** + * Alias for [[setDocConcentration()]] + * @since 1.3.0 + */ def setAlpha(alpha: Double): this.type = setDocConcentration(alpha) /** @@ -154,6 +180,7 @@ class LDA private ( * * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * @since 1.3.0 */ def getTopicConcentration: Double = this.topicConcentration @@ -178,36 +205,51 @@ class LDA private ( * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * @since 1.3.0 */ def setTopicConcentration(topicConcentration: Double): this.type = { this.topicConcentration = topicConcentration this } - /** Alias for [[getTopicConcentration]] */ + /** + * Alias for [[getTopicConcentration]] + * @since 1.3.0 + */ def getBeta: Double = getTopicConcentration - /** Alias for [[setTopicConcentration()]] */ + /** + * Alias for [[setTopicConcentration()]] + * @since 1.3.0 + */ def setBeta(beta: Double): this.type = setTopicConcentration(beta) /** * Maximum number of iterations for learning. + * @since 1.3.0 */ def getMaxIterations: Int = maxIterations /** * Maximum number of iterations for learning. * (default = 20) + * @since 1.3.0 */ def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations this } - /** Random seed */ + /** + * Random seed + * @since 1.3.0 + */ def getSeed: Long = seed - /** Random seed */ + /** + * Random seed + * @since 1.3.0 + */ def setSeed(seed: Long): this.type = { this.seed = seed this @@ -215,6 +257,7 @@ class LDA private ( /** * Period (in iterations) between checkpoints. + * @since 1.3.0 */ def getCheckpointInterval: Int = checkpointInterval @@ -225,6 +268,7 @@ class LDA private ( * [[org.apache.spark.SparkContext]], this setting is ignored. * * @see [[org.apache.spark.SparkContext#setCheckpointDir]] + * @since 1.3.0 */ def setCheckpointInterval(checkpointInterval: Int): this.type = { this.checkpointInterval = checkpointInterval @@ -236,6 +280,7 @@ class LDA private ( * :: DeveloperApi :: * * LDAOptimizer used to perform the actual calculation + * @since 1.4.0 */ @DeveloperApi def getOptimizer: LDAOptimizer = ldaOptimizer @@ -244,6 +289,7 @@ class LDA private ( * :: DeveloperApi :: * * LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer) + * @since 1.4.0 */ @DeveloperApi def setOptimizer(optimizer: LDAOptimizer): this.type = { @@ -254,6 +300,7 @@ class LDA private ( /** * Set the LDAOptimizer used to perform the actual calculation by algorithm name. * Currently "em", "online" are supported. + * @since 1.4.0 */ def setOptimizer(optimizerName: String): this.type = { this.ldaOptimizer = @@ -274,6 +321,7 @@ class LDA private ( * (where the vocabulary size is the length of the vector). * Document IDs must be unique and >= 0. * @return Inferred LDA model + * @since 1.3.0 */ def run(documents: RDD[(Long, Vector)]): LDAModel = { val state = ldaOptimizer.initialize(documents, this) @@ -289,7 +337,10 @@ class LDA private ( state.getLDAModel(iterationTimes) } - /** Java-friendly version of [[run()]] */ + /** + * Java-friendly version of [[run()]] + * @since 1.3.0 + */ def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 82f05e4a18..b70e380c03 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -192,12 +192,24 @@ class LocalLDAModel private[clustering] ( override protected[clustering] val gammaShape: Double = 100) extends LDAModel with Serializable { + /** + * @since 1.3.0 + */ override def k: Int = topics.numCols + /** + * @since 1.3.0 + */ override def vocabSize: Int = topics.numRows + /** + * @since 1.3.0 + */ override def topicsMatrix: Matrix = topics + /** + * @since 1.3.0 + */ override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = { val brzTopics = topics.toBreeze.toDenseMatrix Range(0, k).map { topicIndex => @@ -210,6 +222,9 @@ class LocalLDAModel private[clustering] ( override protected def formatVersion = "1.0" + /** + * @since 1.5.0 + */ override def save(sc: SparkContext, path: String): Unit = { LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, topicConcentration, gammaShape) @@ -223,12 +238,16 @@ class LocalLDAModel private[clustering] ( * * @param documents test corpus to use for calculating log likelihood * @return variational lower bound on the log likelihood of the entire corpus + * @since 1.5.0 */ def logLikelihood(documents: RDD[(Long, Vector)]): Double = logLikelihoodBound(documents, docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k, vocabSize) - /** Java-friendly version of [[logLikelihood]] */ + /** + * Java-friendly version of [[logLikelihood]] + * @since 1.5.0 + */ def logLikelihood(documents: JavaPairRDD[java.lang.Long, Vector]): Double = { logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } @@ -239,6 +258,7 @@ class LocalLDAModel private[clustering] ( * * @param documents test corpus to use for calculating perplexity * @return Variational upper bound on log perplexity per token. + * @since 1.5.0 */ def logPerplexity(documents: RDD[(Long, Vector)]): Double = { val corpusTokenCount = documents @@ -247,7 +267,9 @@ class LocalLDAModel private[clustering] ( -logLikelihood(documents) / corpusTokenCount } - /** Java-friendly version of [[logPerplexity]] */ + /** Java-friendly version of [[logPerplexity]] + * @since 1.5.0 + */ def logPerplexity(documents: JavaPairRDD[java.lang.Long, Vector]): Double = { logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } @@ -325,6 +347,7 @@ class LocalLDAModel private[clustering] ( * for each document. * @param documents documents to predict topic mixture distributions for * @return An RDD of (document ID, topic mixture distribution for document) + * @since 1.3.0 */ // TODO: declare in LDAModel and override once implemented in DistributedLDAModel def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = { @@ -351,7 +374,10 @@ class LocalLDAModel private[clustering] ( } } - /** Java-friendly version of [[topicDistributions]] */ + /** + * Java-friendly version of [[topicDistributions]] + * @since 1.4.1 + */ def topicDistributions( documents: JavaPairRDD[java.lang.Long, Vector]): JavaPairRDD[java.lang.Long, Vector] = { val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) @@ -425,6 +451,9 @@ object LocalLDAModel extends Loader[LocalLDAModel] { } } + /** + * @since 1.5.0 + */ override def load(sc: SparkContext, path: String): LocalLDAModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) implicit val formats = DefaultFormats @@ -481,6 +510,7 @@ class DistributedLDAModel private[clustering] ( * Convert model to a local model. * The local model stores the inferred topics but not the topic distributions for training * documents. + * @since 1.3.0 */ def toLocal: LocalLDAModel = new LocalLDAModel(topicsMatrix, docConcentration, topicConcentration, gammaShape) @@ -491,6 +521,7 @@ class DistributedLDAModel private[clustering] ( * No guarantees are given about the ordering of the topics. * * WARNING: This matrix is collected from an RDD. Beware memory usage when vocabSize, k are large. + * @since 1.3.0 */ override lazy val topicsMatrix: Matrix = { // Collect row-major topics @@ -510,6 +541,9 @@ class DistributedLDAModel private[clustering] ( Matrices.fromBreeze(brzTopics) } + /** + * @since 1.3.0 + */ override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = { val numTopics = k // Note: N_k is not needed to find the top terms, but it is needed to normalize weights @@ -548,6 +582,7 @@ class DistributedLDAModel private[clustering] ( * @return Array over topics. Each element represent as a pair of matching arrays: * (IDs for the documents, weights of the topic in these documents). * For each topic, documents are sorted in order of decreasing topic weights. + * @since 1.5.0 */ def topDocumentsPerTopic(maxDocumentsPerTopic: Int): Array[(Array[Long], Array[Double])] = { val numTopics = k @@ -587,6 +622,7 @@ class DistributedLDAModel private[clustering] ( * - This excludes the prior; for that, use [[logPrior]]. * - Even with [[logPrior]], this is NOT the same as the data log likelihood given the * hyperparameters. + * @since 1.3.0 */ lazy val logLikelihood: Double = { // TODO: generalize this for asymmetric (non-scalar) alpha @@ -612,7 +648,8 @@ class DistributedLDAModel private[clustering] ( /** * Log probability of the current parameter estimate: - * log P(topics, topic distributions for docs | alpha, eta) + * log P(topics, topic distributions for docs | alpha, eta) + * @since 1.3.0 */ lazy val logPrior: Double = { // TODO: generalize this for asymmetric (non-scalar) alpha @@ -644,6 +681,7 @@ class DistributedLDAModel private[clustering] ( * ("theta_doc"). * * @return RDD of (document ID, topic distribution) pairs + * @since 1.3.0 */ def topicDistributions: RDD[(Long, Vector)] = { graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) => @@ -651,7 +689,10 @@ class DistributedLDAModel private[clustering] ( } } - /** Java-friendly version of [[topicDistributions]] */ + /** + * Java-friendly version of [[topicDistributions]] + * @since 1.4.1 + */ def javaTopicDistributions: JavaPairRDD[java.lang.Long, Vector] = { JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, Vector)]]) } @@ -659,6 +700,7 @@ class DistributedLDAModel private[clustering] ( /** * For each document, return the top k weighted topics for that document and their weights. * @return RDD of (doc ID, topic indices, topic weights) + * @since 1.5.0 */ def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = { graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) => @@ -673,7 +715,10 @@ class DistributedLDAModel private[clustering] ( } } - /** Java-friendly version of [[topTopicsPerDocument]] */ + /** + * Java-friendly version of [[topTopicsPerDocument]] + * @since 1.5.0 + */ def javaTopTopicsPerDocument(k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[Double])] = { val topics = topTopicsPerDocument(k) topics.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[Double])]].toJavaRDD() @@ -684,6 +729,10 @@ class DistributedLDAModel private[clustering] ( override protected def formatVersion = "1.0" + /** + * Java-friendly version of [[topicDistributions]] + * @since 1.5.0 + */ override def save(sc: SparkContext, path: String): Unit = { DistributedLDAModel.SaveLoadV1_0.save( sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration, @@ -784,6 +833,9 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { } + /** + * @since 1.5.0 + */ override def load(sc: SparkContext, path: String): DistributedLDAModel = { val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path) implicit val formats = DefaultFormats diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index a0008f9c99..360241c808 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.RDD * * An LDAOptimizer specifies which optimization/learning/inference algorithm to use, and it can * hold optimizer-specific parameters for users to set. + * @since 1.4.0 */ @DeveloperApi sealed trait LDAOptimizer { @@ -73,7 +74,7 @@ sealed trait LDAOptimizer { * - Paper which clearly explains several algorithms, including EM: * Asuncion, Welling, Smyth, and Teh. * "On Smoothing and Inference for Topic Models." UAI, 2009. - * + * @since 1.4.0 */ @DeveloperApi final class EMLDAOptimizer extends LDAOptimizer { @@ -225,6 +226,7 @@ final class EMLDAOptimizer extends LDAOptimizer { * * Original Online LDA paper: * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + * @since 1.4.0 */ @DeveloperApi final class OnlineLDAOptimizer extends LDAOptimizer { @@ -274,6 +276,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. + * @since 1.4.0 */ def getTau0: Double = this.tau0 @@ -281,6 +284,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. * Default: 1024, following the original Online LDA paper. + * @since 1.4.0 */ def setTau0(tau0: Double): this.type = { require(tau0 > 0, s"LDA tau0 must be positive, but was set to $tau0") @@ -290,6 +294,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate + * @since 1.4.0 */ def getKappa: Double = this.kappa @@ -297,6 +302,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. * Default: 0.51, based on the original Online LDA paper. + * @since 1.4.0 */ def setKappa(kappa: Double): this.type = { require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa") @@ -306,6 +312,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration + * @since 1.4.0 */ def getMiniBatchFraction: Double = this.miniBatchFraction @@ -318,6 +325,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * maxIterations * miniBatchFraction >= 1. * * Default: 0.05, i.e., 5% of total documents. + * @since 1.4.0 */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0, @@ -329,6 +337,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Optimize alpha, indicates whether alpha (Dirichlet parameter for document-topic distribution) * will be optimized during training. + * @since 1.5.0 */ def getOptimzeAlpha: Boolean = this.optimizeAlpha @@ -336,6 +345,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * Sets whether to optimize alpha parameter during training. * * Default: false + * @since 1.5.0 */ def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = { this.optimizeAlpha = optimizeAlpha diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 407e43a024..b4733ca975 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -39,12 +39,16 @@ import org.apache.spark.{Logging, SparkContext, SparkException} * * @param k number of clusters * @param assignments an RDD of clustering [[PowerIterationClustering#Assignment]]s + * @since 1.3.0 */ @Experimental class PowerIterationClusteringModel( val k: Int, val assignments: RDD[PowerIterationClustering.Assignment]) extends Saveable with Serializable { + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { PowerIterationClusteringModel.SaveLoadV1_0.save(sc, this, path) } @@ -52,6 +56,9 @@ class PowerIterationClusteringModel( override protected def formatVersion: String = "1.0" } +/** + * @since 1.4.0 + */ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringModel] { override def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) @@ -65,6 +72,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode private[clustering] val thisClassName = "org.apache.spark.mllib.clustering.PowerIterationClusteringModel" + /** + * @since 1.4.0 + */ def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ @@ -77,6 +87,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode dataRDD.write.parquet(Loader.dataPath(path)) } + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { implicit val formats = DefaultFormats val sqlContext = new SQLContext(sc) @@ -120,13 +133,16 @@ class PowerIterationClustering private[clustering] ( import org.apache.spark.mllib.clustering.PowerIterationClustering._ - /** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100, - * initMode: "random"}. + /** + * Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100, + * initMode: "random"}. + * @since 1.3.0 */ def this() = this(k = 2, maxIterations = 100, initMode = "random") /** * Set the number of clusters. + * @since 1.3.0 */ def setK(k: Int): this.type = { this.k = k @@ -135,6 +151,7 @@ class PowerIterationClustering private[clustering] ( /** * Set maximum number of iterations of the power iteration loop + * @since 1.3.0 */ def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations @@ -144,6 +161,7 @@ class PowerIterationClustering private[clustering] ( /** * Set the initialization mode. This can be either "random" to use a random vector * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + * @since 1.3.0 */ def setInitializationMode(mode: String): this.type = { this.initMode = mode match { @@ -164,6 +182,7 @@ class PowerIterationClustering private[clustering] ( * assume s,,ij,, = 0.0. * * @return a [[PowerIterationClusteringModel]] that contains the clustering result + * @since 1.5.0 */ def run(graph: Graph[Double, Double]): PowerIterationClusteringModel = { val w = normalize(graph) @@ -185,6 +204,7 @@ class PowerIterationClustering private[clustering] ( * assume s,,ij,, = 0.0. * * @return a [[PowerIterationClusteringModel]] that contains the clustering result + * @since 1.3.0 */ def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = { val w = normalize(similarities) @@ -197,6 +217,7 @@ class PowerIterationClustering private[clustering] ( /** * A Java-friendly version of [[PowerIterationClustering.run]]. + * @since 1.3.0 */ def run(similarities: JavaRDD[(java.lang.Long, java.lang.Long, java.lang.Double)]) : PowerIterationClusteringModel = { @@ -221,6 +242,9 @@ class PowerIterationClustering private[clustering] ( } } +/** + * @since 1.3.0 + */ @Experimental object PowerIterationClustering extends Logging { @@ -229,6 +253,7 @@ object PowerIterationClustering extends Logging { * Cluster assignment. * @param id node id * @param cluster assigned cluster id + * @since 1.3.0 */ @Experimental case class Assignment(id: Long, cluster: Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index d9b34cec64..a915804b02 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -63,6 +63,7 @@ import org.apache.spark.util.random.XORShiftRandom * such that at time t + h the discount applied to the data from t is 0.5. * The definition remains the same whether the time unit is given * as batches or points. + * @since 1.2.0 * */ @Experimental @@ -70,7 +71,10 @@ class StreamingKMeansModel( override val clusterCenters: Array[Vector], val clusterWeights: Array[Double]) extends KMeansModel(clusterCenters) with Logging { - /** Perform a k-means update on a batch of data. */ + /** + * Perform a k-means update on a batch of data. + * @since 1.2.0 + */ def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = { // find nearest cluster to each point @@ -82,6 +86,7 @@ class StreamingKMeansModel( (p1._1, p1._2 + p2._2) } val dim = clusterCenters(0).size + val pointStats: Array[(Int, (Vector, Long))] = closest .aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs) .collect() @@ -161,6 +166,7 @@ class StreamingKMeansModel( * .setRandomCenters(5, 100.0) * .trainOn(DStream) * }}} + * @since 1.2.0 */ @Experimental class StreamingKMeans( @@ -168,23 +174,33 @@ class StreamingKMeans( var decayFactor: Double, var timeUnit: String) extends Logging with Serializable { + /** @since 1.2.0 */ def this() = this(2, 1.0, StreamingKMeans.BATCHES) protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) - /** Set the number of clusters. */ + /** + * Set the number of clusters. + * @since 1.2.0 + */ def setK(k: Int): this.type = { this.k = k this } - /** Set the decay factor directly (for forgetful algorithms). */ + /** + * Set the decay factor directly (for forgetful algorithms). + * @since 1.2.0 + */ def setDecayFactor(a: Double): this.type = { this.decayFactor = a this } - /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ + /** + * Set the half life and time unit ("batches" or "points") for forgetful algorithms. + * @since 1.2.0 + */ def setHalfLife(halfLife: Double, timeUnit: String): this.type = { if (timeUnit != StreamingKMeans.BATCHES && timeUnit != StreamingKMeans.POINTS) { throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) @@ -195,7 +211,10 @@ class StreamingKMeans( this } - /** Specify initial centers directly. */ + /** + * Specify initial centers directly. + * @since 1.2.0 + */ def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = { model = new StreamingKMeansModel(centers, weights) this @@ -207,6 +226,7 @@ class StreamingKMeans( * @param dim Number of dimensions * @param weight Weight for each center * @param seed Random seed + * @since 1.2.0 */ def setRandomCenters(dim: Int, weight: Double, seed: Long = Utils.random.nextLong): this.type = { val random = new XORShiftRandom(seed) @@ -216,7 +236,10 @@ class StreamingKMeans( this } - /** Return the latest model. */ + /** + * Return the latest model. + * @since 1.2.0 + */ def latestModel(): StreamingKMeansModel = { model } @@ -228,6 +251,7 @@ class StreamingKMeans( * and updates the model using each batch of data from the stream. * * @param data DStream containing vector data + * @since 1.2.0 */ def trainOn(data: DStream[Vector]) { assertInitialized() @@ -236,7 +260,10 @@ class StreamingKMeans( } } - /** Java-friendly version of `trainOn`. */ + /** + * Java-friendly version of `trainOn`. + * @since 1.4.0 + */ def trainOn(data: JavaDStream[Vector]): Unit = trainOn(data.dstream) /** @@ -244,13 +271,17 @@ class StreamingKMeans( * * @param data DStream containing vector data * @return DStream containing predictions + * @since 1.2.0 */ def predictOn(data: DStream[Vector]): DStream[Int] = { assertInitialized() data.map(model.predict) } - /** Java-friendly version of `predictOn`. */ + /** + * Java-friendly version of `predictOn`. + * @since 1.4.0 + */ def predictOn(data: JavaDStream[Vector]): JavaDStream[java.lang.Integer] = { JavaDStream.fromDStream(predictOn(data.dstream).asInstanceOf[DStream[java.lang.Integer]]) } @@ -261,13 +292,17 @@ class StreamingKMeans( * @param data DStream containing (key, feature vector) pairs * @tparam K key type * @return DStream containing the input keys and the predictions as values + * @since 1.2.0 */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { assertInitialized() data.mapValues(model.predict) } - /** Java-friendly version of `predictOnValues`. */ + /** + * Java-friendly version of `predictOnValues`. + * @since 1.4.0 + */ def predictOnValues[K]( data: JavaPairDStream[K, Vector]): JavaPairDStream[K, java.lang.Integer] = { implicit val tag = fakeClassTag[K] |