aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-19 13:17:26 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-19 13:17:26 -0700
commit5b62bef8cbf73f910513ef3b1f557aa94b384854 (patch)
tree1f49cffb09ed5bac692b1f796ee6b18b21a33600 /mllib
parentd898c33f774b9a3db2fb6aa8f0cb2c2ac6004b58 (diff)
downloadspark-5b62bef8cbf73f910513ef3b1f557aa94b384854.tar.gz
spark-5b62bef8cbf73f910513ef3b1f557aa94b384854.tar.bz2
spark-5b62bef8cbf73f910513ef3b1f557aa94b384854.zip
[SPARK-8918] [MLLIB] [DOC] Add @since tags to mllib.clustering
This continues the work from #8256. I removed `since` tags from private/protected/local methods/variables (see https://github.com/apache/spark/commit/72fdeb64630470f6f46cf3eed8ffbfe83a7c4659). MechCoder Closes #8256 Author: Xiangrui Meng <meng@databricks.com> Author: Xiaoqing Wang <spark445@126.com> Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #8288 from mengxr/SPARK-8918.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala56
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala32
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala36
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala37
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala71
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala64
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala29
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala53
9 files changed, 338 insertions, 52 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index e459367333..bc27b1fe73 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -62,6 +62,7 @@ class GaussianMixture private (
/**
* Constructs a default instance. The default parameters are {k: 2, convergenceTol: 0.01,
* maxIterations: 100, seed: random}.
+ * @since 1.3.0
*/
def this() = this(2, 0.01, 100, Utils.random.nextLong())
@@ -72,9 +73,11 @@ class GaussianMixture private (
// default random starting point
private var initialModel: Option[GaussianMixtureModel] = None
- /** Set the initial GMM starting point, bypassing the random initialization.
- * You must call setK() prior to calling this method, and the condition
- * (model.k == this.k) must be met; failure will result in an IllegalArgumentException
+ /**
+ * Set the initial GMM starting point, bypassing the random initialization.
+ * You must call setK() prior to calling this method, and the condition
+ * (model.k == this.k) must be met; failure will result in an IllegalArgumentException
+ * @since 1.3.0
*/
def setInitialModel(model: GaussianMixtureModel): this.type = {
if (model.k == k) {
@@ -85,30 +88,46 @@ class GaussianMixture private (
this
}
- /** Return the user supplied initial GMM, if supplied */
+ /**
+ * Return the user supplied initial GMM, if supplied
+ * @since 1.3.0
+ */
def getInitialModel: Option[GaussianMixtureModel] = initialModel
- /** Set the number of Gaussians in the mixture model. Default: 2 */
+ /**
+ * Set the number of Gaussians in the mixture model. Default: 2
+ * @since 1.3.0
+ */
def setK(k: Int): this.type = {
this.k = k
this
}
- /** Return the number of Gaussians in the mixture model */
+ /**
+ * Return the number of Gaussians in the mixture model
+ * @since 1.3.0
+ */
def getK: Int = k
- /** Set the maximum number of iterations to run. Default: 100 */
+ /**
+ * Set the maximum number of iterations to run. Default: 100
+ * @since 1.3.0
+ */
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
this
}
- /** Return the maximum number of iterations to run */
+ /**
+ * Return the maximum number of iterations to run
+ * @since 1.3.0
+ */
def getMaxIterations: Int = maxIterations
/**
* Set the largest change in log-likelihood at which convergence is
* considered to have occurred.
+ * @since 1.3.0
*/
def setConvergenceTol(convergenceTol: Double): this.type = {
this.convergenceTol = convergenceTol
@@ -118,19 +137,29 @@ class GaussianMixture private (
/**
* Return the largest change in log-likelihood at which convergence is
* considered to have occurred.
+ * @since 1.3.0
*/
def getConvergenceTol: Double = convergenceTol
- /** Set the random seed */
+ /**
+ * Set the random seed
+ * @since 1.3.0
+ */
def setSeed(seed: Long): this.type = {
this.seed = seed
this
}
- /** Return the random seed */
+ /**
+ * Return the random seed
+ * @since 1.3.0
+ */
def getSeed: Long = seed
- /** Perform expectation maximization */
+ /**
+ * Perform expectation maximization
+ * @since 1.3.0
+ */
def run(data: RDD[Vector]): GaussianMixtureModel = {
val sc = data.sparkContext
@@ -204,7 +233,10 @@ class GaussianMixture private (
new GaussianMixtureModel(weights, gaussians)
}
- /** Java-friendly version of [[run()]] */
+ /**
+ * Java-friendly version of [[run()]]
+ * @since 1.3.0
+ */
def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd)
private def updateWeightsAndGaussians(
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index 76aeebd703..2fa0473737 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.{SQLContext, Row}
* the weight for Gaussian i, and weights.sum == 1
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
+ * @since 1.3.0
*/
@Experimental
class GaussianMixtureModel(
@@ -53,32 +54,48 @@ class GaussianMixtureModel(
override protected def formatVersion = "1.0"
+ /**
+ * @since 1.4.0
+ */
override def save(sc: SparkContext, path: String): Unit = {
GaussianMixtureModel.SaveLoadV1_0.save(sc, path, weights, gaussians)
}
- /** Number of gaussians in mixture */
+ /**
+ * Number of gaussians in mixture
+ * @since 1.3.0
+ */
def k: Int = weights.length
- /** Maps given points to their cluster indices. */
+ /**
+ * Maps given points to their cluster indices.
+ * @since 1.3.0
+ */
def predict(points: RDD[Vector]): RDD[Int] = {
val responsibilityMatrix = predictSoft(points)
responsibilityMatrix.map(r => r.indexOf(r.max))
}
- /** Maps given point to its cluster index. */
+ /**
+ * Maps given point to its cluster index.
+ * @since 1.5.0
+ */
def predict(point: Vector): Int = {
val r = computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k)
r.indexOf(r.max)
}
- /** Java-friendly version of [[predict()]] */
+ /**
+ * Java-friendly version of [[predict()]]
+ * @since 1.4.0
+ */
def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] =
predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]]
/**
* Given the input vectors, return the membership value of each vector
* to all mixture components.
+ * @since 1.3.0
*/
def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = {
val sc = points.sparkContext
@@ -91,6 +108,7 @@ class GaussianMixtureModel(
/**
* Given the input vector, return the membership values to all mixture components.
+ * @since 1.4.0
*/
def predictSoft(point: Vector): Array[Double] = {
computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k)
@@ -115,6 +133,9 @@ class GaussianMixtureModel(
}
}
+/**
+ * @since 1.4.0
+ */
@Experimental
object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
@@ -165,6 +186,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
}
}
+ /**
+ * @since 1.4.0
+ */
override def load(sc: SparkContext, path: String) : GaussianMixtureModel = {
val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path)
implicit val formats = DefaultFormats
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 0a65403f4e..9ef6834e5e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -49,15 +49,20 @@ class KMeans private (
/**
* Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1,
* initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}.
+ * @since 0.8.0
*/
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong())
/**
* Number of clusters to create (k).
+ * @since 1.4.0
*/
def getK: Int = k
- /** Set the number of clusters to create (k). Default: 2. */
+ /**
+ * Set the number of clusters to create (k). Default: 2.
+ * @since 0.8.0
+ */
def setK(k: Int): this.type = {
this.k = k
this
@@ -65,10 +70,14 @@ class KMeans private (
/**
* Maximum number of iterations to run.
+ * @since 1.4.0
*/
def getMaxIterations: Int = maxIterations
- /** Set maximum number of iterations to run. Default: 20. */
+ /**
+ * Set maximum number of iterations to run. Default: 20.
+ * @since 0.8.0
+ */
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
this
@@ -76,6 +85,7 @@ class KMeans private (
/**
* The initialization algorithm. This can be either "random" or "k-means||".
+ * @since 1.4.0
*/
def getInitializationMode: String = initializationMode
@@ -83,6 +93,7 @@ class KMeans private (
* Set the initialization algorithm. This can be either "random" to choose random points as
* initial cluster centers, or "k-means||" to use a parallel variant of k-means++
* (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||.
+ * @since 0.8.0
*/
def setInitializationMode(initializationMode: String): this.type = {
KMeans.validateInitMode(initializationMode)
@@ -93,6 +104,7 @@ class KMeans private (
/**
* :: Experimental ::
* Number of runs of the algorithm to execute in parallel.
+ * @since 1.4.0
*/
@Experimental
def getRuns: Int = runs
@@ -102,6 +114,7 @@ class KMeans private (
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
+ * @since 0.8.0
*/
@Experimental
def setRuns(runs: Int): this.type = {
@@ -114,12 +127,14 @@ class KMeans private (
/**
* Number of steps for the k-means|| initialization mode
+ * @since 1.4.0
*/
def getInitializationSteps: Int = initializationSteps
/**
* Set the number of steps for the k-means|| initialization mode. This is an advanced
* setting -- the default of 5 is almost always enough. Default: 5.
+ * @since 0.8.0
*/
def setInitializationSteps(initializationSteps: Int): this.type = {
if (initializationSteps <= 0) {
@@ -131,12 +146,14 @@ class KMeans private (
/**
* The distance threshold within which we've consider centers to have converged.
+ * @since 1.4.0
*/
def getEpsilon: Double = epsilon
/**
* Set the distance threshold within which we've consider centers to have converged.
* If all centers move less than this Euclidean distance, we stop iterating one run.
+ * @since 0.8.0
*/
def setEpsilon(epsilon: Double): this.type = {
this.epsilon = epsilon
@@ -145,10 +162,14 @@ class KMeans private (
/**
* The random seed for cluster initialization.
+ * @since 1.4.0
*/
def getSeed: Long = seed
- /** Set the random seed for cluster initialization. */
+ /**
+ * Set the random seed for cluster initialization.
+ * @since 1.4.0
+ */
def setSeed(seed: Long): this.type = {
this.seed = seed
this
@@ -162,6 +183,7 @@ class KMeans private (
* Set the initial starting point, bypassing the random initialization or k-means||
* The condition model.k == this.k must be met, failure results
* in an IllegalArgumentException.
+ * @since 1.4.0
*/
def setInitialModel(model: KMeansModel): this.type = {
require(model.k == k, "mismatched cluster count")
@@ -172,6 +194,7 @@ class KMeans private (
/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
+ * @since 0.8.0
*/
def run(data: RDD[Vector]): KMeansModel = {
@@ -430,11 +453,14 @@ class KMeans private (
/**
* Top-level methods for calling K-means clustering.
+ * @since 0.8.0
*/
object KMeans {
// Initialization mode names
+ /** @since 0.8.0 */
val RANDOM = "random"
+ /** @since 0.8.0 */
val K_MEANS_PARALLEL = "k-means||"
/**
@@ -446,6 +472,7 @@ object KMeans {
* @param runs number of parallel runs, defaults to 1. The best model is returned.
* @param initializationMode initialization model, either "random" or "k-means||" (default).
* @param seed random seed value for cluster initialization
+ * @since 1.3.0
*/
def train(
data: RDD[Vector],
@@ -470,6 +497,7 @@ object KMeans {
* @param maxIterations max number of iterations
* @param runs number of parallel runs, defaults to 1. The best model is returned.
* @param initializationMode initialization model, either "random" or "k-means||" (default).
+ * @since 0.8.0
*/
def train(
data: RDD[Vector],
@@ -486,6 +514,7 @@ object KMeans {
/**
* Trains a k-means model using specified parameters and the default values for unspecified.
+ * @since 0.8.0
*/
def train(
data: RDD[Vector],
@@ -496,6 +525,7 @@ object KMeans {
/**
* Trains a k-means model using specified parameters and the default values for unspecified.
+ * @since 0.8.0
*/
def train(
data: RDD[Vector],
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 96359024fa..8de2087ceb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -34,35 +34,52 @@ import org.apache.spark.sql.Row
/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
+ * @since 0.8.0
*/
class KMeansModel (
val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable {
- /** A Java-friendly constructor that takes an Iterable of Vectors. */
+ /**
+ * A Java-friendly constructor that takes an Iterable of Vectors.
+ * @since 1.4.0
+ */
def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray)
- /** Total number of clusters. */
+ /**
+ * Total number of clusters.
+ * @since 0.8.0
+ */
def k: Int = clusterCenters.length
- /** Returns the cluster index that a given point belongs to. */
+ /**
+ * Returns the cluster index that a given point belongs to.
+ * @since 0.8.0
+ */
def predict(point: Vector): Int = {
KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1
}
- /** Maps given points to their cluster indices. */
+ /**
+ * Maps given points to their cluster indices.
+ * @since 1.0.0
+ */
def predict(points: RDD[Vector]): RDD[Int] = {
val centersWithNorm = clusterCentersWithNorm
val bcCentersWithNorm = points.context.broadcast(centersWithNorm)
points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1)
}
- /** Maps given points to their cluster indices. */
+ /**
+ * Maps given points to their cluster indices.
+ * @since 1.0.0
+ */
def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] =
predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]]
/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
+ * @since 0.8.0
*/
def computeCost(data: RDD[Vector]): Double = {
val centersWithNorm = clusterCentersWithNorm
@@ -73,6 +90,9 @@ class KMeansModel (
private def clusterCentersWithNorm: Iterable[VectorWithNorm] =
clusterCenters.map(new VectorWithNorm(_))
+ /**
+ * @since 1.4.0
+ */
override def save(sc: SparkContext, path: String): Unit = {
KMeansModel.SaveLoadV1_0.save(sc, this, path)
}
@@ -80,7 +100,14 @@ class KMeansModel (
override protected def formatVersion: String = "1.0"
}
+/**
+ * @since 1.4.0
+ */
object KMeansModel extends Loader[KMeansModel] {
+
+ /**
+ * @since 1.4.0
+ */
override def load(sc: SparkContext, path: String): KMeansModel = {
KMeansModel.SaveLoadV1_0.load(sc, path)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
index 0fc9b1ac4d..2a8c6acbae 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
@@ -43,6 +43,7 @@ import org.apache.spark.util.Utils
*
* @see [[http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation Latent Dirichlet allocation
* (Wikipedia)]]
+ * @since 1.3.0
*/
@Experimental
class LDA private (
@@ -54,18 +55,25 @@ class LDA private (
private var checkpointInterval: Int,
private var ldaOptimizer: LDAOptimizer) extends Logging {
+ /**
+ * Constructs a LDA instance with default parameters.
+ * @since 1.3.0
+ */
def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1),
topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10,
ldaOptimizer = new EMLDAOptimizer)
/**
* Number of topics to infer. I.e., the number of soft cluster centers.
+ *
+ * @since 1.3.0
*/
def getK: Int = k
/**
* Number of topics to infer. I.e., the number of soft cluster centers.
* (default = 10)
+ * @since 1.3.0
*/
def setK(k: Int): this.type = {
require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k")
@@ -78,6 +86,7 @@ class LDA private (
* distributions over topics ("theta").
*
* This is the parameter to a Dirichlet distribution.
+ * @since 1.5.0
*/
def getAsymmetricDocConcentration: Vector = this.docConcentration
@@ -87,6 +96,7 @@ class LDA private (
*
* This method assumes the Dirichlet distribution is symmetric and can be described by a single
* [[Double]] parameter. It should fail if docConcentration is asymmetric.
+ * @since 1.3.0
*/
def getDocConcentration: Double = {
val parameter = docConcentration(0)
@@ -121,6 +131,7 @@ class LDA private (
* - Values should be >= 0
* - default = uniformly (1.0 / k), following the implementation from
* [[https://github.com/Blei-Lab/onlineldavb]].
+ * @since 1.5.0
*/
def setDocConcentration(docConcentration: Vector): this.type = {
require(docConcentration.size > 0, "docConcentration must have > 0 elements")
@@ -128,22 +139,37 @@ class LDA private (
this
}
- /** Replicates a [[Double]] docConcentration to create a symmetric prior. */
+ /**
+ * Replicates a [[Double]] docConcentration to create a symmetric prior.
+ * @since 1.3.0
+ */
def setDocConcentration(docConcentration: Double): this.type = {
this.docConcentration = Vectors.dense(docConcentration)
this
}
- /** Alias for [[getAsymmetricDocConcentration]] */
+ /**
+ * Alias for [[getAsymmetricDocConcentration]]
+ * @since 1.5.0
+ */
def getAsymmetricAlpha: Vector = getAsymmetricDocConcentration
- /** Alias for [[getDocConcentration]] */
+ /**
+ * Alias for [[getDocConcentration]]
+ * @since 1.3.0
+ */
def getAlpha: Double = getDocConcentration
- /** Alias for [[setDocConcentration()]] */
+ /**
+ * Alias for [[setDocConcentration()]]
+ * @since 1.5.0
+ */
def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha)
- /** Alias for [[setDocConcentration()]] */
+ /**
+ * Alias for [[setDocConcentration()]]
+ * @since 1.3.0
+ */
def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)
/**
@@ -154,6 +180,7 @@ class LDA private (
*
* Note: The topics' distributions over terms are called "beta" in the original LDA paper
* by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
+ * @since 1.3.0
*/
def getTopicConcentration: Double = this.topicConcentration
@@ -178,36 +205,51 @@ class LDA private (
* - Value should be >= 0
* - default = (1.0 / k), following the implementation from
* [[https://github.com/Blei-Lab/onlineldavb]].
+ * @since 1.3.0
*/
def setTopicConcentration(topicConcentration: Double): this.type = {
this.topicConcentration = topicConcentration
this
}
- /** Alias for [[getTopicConcentration]] */
+ /**
+ * Alias for [[getTopicConcentration]]
+ * @since 1.3.0
+ */
def getBeta: Double = getTopicConcentration
- /** Alias for [[setTopicConcentration()]] */
+ /**
+ * Alias for [[setTopicConcentration()]]
+ * @since 1.3.0
+ */
def setBeta(beta: Double): this.type = setTopicConcentration(beta)
/**
* Maximum number of iterations for learning.
+ * @since 1.3.0
*/
def getMaxIterations: Int = maxIterations
/**
* Maximum number of iterations for learning.
* (default = 20)
+ * @since 1.3.0
*/
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
this
}
- /** Random seed */
+ /**
+ * Random seed
+ * @since 1.3.0
+ */
def getSeed: Long = seed
- /** Random seed */
+ /**
+ * Random seed
+ * @since 1.3.0
+ */
def setSeed(seed: Long): this.type = {
this.seed = seed
this
@@ -215,6 +257,7 @@ class LDA private (
/**
* Period (in iterations) between checkpoints.
+ * @since 1.3.0
*/
def getCheckpointInterval: Int = checkpointInterval
@@ -225,6 +268,7 @@ class LDA private (
* [[org.apache.spark.SparkContext]], this setting is ignored.
*
* @see [[org.apache.spark.SparkContext#setCheckpointDir]]
+ * @since 1.3.0
*/
def setCheckpointInterval(checkpointInterval: Int): this.type = {
this.checkpointInterval = checkpointInterval
@@ -236,6 +280,7 @@ class LDA private (
* :: DeveloperApi ::
*
* LDAOptimizer used to perform the actual calculation
+ * @since 1.4.0
*/
@DeveloperApi
def getOptimizer: LDAOptimizer = ldaOptimizer
@@ -244,6 +289,7 @@ class LDA private (
* :: DeveloperApi ::
*
* LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer)
+ * @since 1.4.0
*/
@DeveloperApi
def setOptimizer(optimizer: LDAOptimizer): this.type = {
@@ -254,6 +300,7 @@ class LDA private (
/**
* Set the LDAOptimizer used to perform the actual calculation by algorithm name.
* Currently "em", "online" are supported.
+ * @since 1.4.0
*/
def setOptimizer(optimizerName: String): this.type = {
this.ldaOptimizer =
@@ -274,6 +321,7 @@ class LDA private (
* (where the vocabulary size is the length of the vector).
* Document IDs must be unique and >= 0.
* @return Inferred LDA model
+ * @since 1.3.0
*/
def run(documents: RDD[(Long, Vector)]): LDAModel = {
val state = ldaOptimizer.initialize(documents, this)
@@ -289,7 +337,10 @@ class LDA private (
state.getLDAModel(iterationTimes)
}
- /** Java-friendly version of [[run()]] */
+ /**
+ * Java-friendly version of [[run()]]
+ * @since 1.3.0
+ */
def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = {
run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 82f05e4a18..b70e380c03 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -192,12 +192,24 @@ class LocalLDAModel private[clustering] (
override protected[clustering] val gammaShape: Double = 100)
extends LDAModel with Serializable {
+ /**
+ * @since 1.3.0
+ */
override def k: Int = topics.numCols
+ /**
+ * @since 1.3.0
+ */
override def vocabSize: Int = topics.numRows
+ /**
+ * @since 1.3.0
+ */
override def topicsMatrix: Matrix = topics
+ /**
+ * @since 1.3.0
+ */
override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = {
val brzTopics = topics.toBreeze.toDenseMatrix
Range(0, k).map { topicIndex =>
@@ -210,6 +222,9 @@ class LocalLDAModel private[clustering] (
override protected def formatVersion = "1.0"
+ /**
+ * @since 1.5.0
+ */
override def save(sc: SparkContext, path: String): Unit = {
LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, topicConcentration,
gammaShape)
@@ -223,12 +238,16 @@ class LocalLDAModel private[clustering] (
*
* @param documents test corpus to use for calculating log likelihood
* @return variational lower bound on the log likelihood of the entire corpus
+ * @since 1.5.0
*/
def logLikelihood(documents: RDD[(Long, Vector)]): Double = logLikelihoodBound(documents,
docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k,
vocabSize)
- /** Java-friendly version of [[logLikelihood]] */
+ /**
+ * Java-friendly version of [[logLikelihood]]
+ * @since 1.5.0
+ */
def logLikelihood(documents: JavaPairRDD[java.lang.Long, Vector]): Double = {
logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
}
@@ -239,6 +258,7 @@ class LocalLDAModel private[clustering] (
*
* @param documents test corpus to use for calculating perplexity
* @return Variational upper bound on log perplexity per token.
+ * @since 1.5.0
*/
def logPerplexity(documents: RDD[(Long, Vector)]): Double = {
val corpusTokenCount = documents
@@ -247,7 +267,9 @@ class LocalLDAModel private[clustering] (
-logLikelihood(documents) / corpusTokenCount
}
- /** Java-friendly version of [[logPerplexity]] */
+ /** Java-friendly version of [[logPerplexity]]
+ * @since 1.5.0
+ */
def logPerplexity(documents: JavaPairRDD[java.lang.Long, Vector]): Double = {
logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
}
@@ -325,6 +347,7 @@ class LocalLDAModel private[clustering] (
* for each document.
* @param documents documents to predict topic mixture distributions for
* @return An RDD of (document ID, topic mixture distribution for document)
+ * @since 1.3.0
*/
// TODO: declare in LDAModel and override once implemented in DistributedLDAModel
def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = {
@@ -351,7 +374,10 @@ class LocalLDAModel private[clustering] (
}
}
- /** Java-friendly version of [[topicDistributions]] */
+ /**
+ * Java-friendly version of [[topicDistributions]]
+ * @since 1.4.1
+ */
def topicDistributions(
documents: JavaPairRDD[java.lang.Long, Vector]): JavaPairRDD[java.lang.Long, Vector] = {
val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
@@ -425,6 +451,9 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
}
}
+ /**
+ * @since 1.5.0
+ */
override def load(sc: SparkContext, path: String): LocalLDAModel = {
val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path)
implicit val formats = DefaultFormats
@@ -481,6 +510,7 @@ class DistributedLDAModel private[clustering] (
* Convert model to a local model.
* The local model stores the inferred topics but not the topic distributions for training
* documents.
+ * @since 1.3.0
*/
def toLocal: LocalLDAModel = new LocalLDAModel(topicsMatrix, docConcentration, topicConcentration,
gammaShape)
@@ -491,6 +521,7 @@ class DistributedLDAModel private[clustering] (
* No guarantees are given about the ordering of the topics.
*
* WARNING: This matrix is collected from an RDD. Beware memory usage when vocabSize, k are large.
+ * @since 1.3.0
*/
override lazy val topicsMatrix: Matrix = {
// Collect row-major topics
@@ -510,6 +541,9 @@ class DistributedLDAModel private[clustering] (
Matrices.fromBreeze(brzTopics)
}
+ /**
+ * @since 1.3.0
+ */
override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = {
val numTopics = k
// Note: N_k is not needed to find the top terms, but it is needed to normalize weights
@@ -548,6 +582,7 @@ class DistributedLDAModel private[clustering] (
* @return Array over topics. Each element represent as a pair of matching arrays:
* (IDs for the documents, weights of the topic in these documents).
* For each topic, documents are sorted in order of decreasing topic weights.
+ * @since 1.5.0
*/
def topDocumentsPerTopic(maxDocumentsPerTopic: Int): Array[(Array[Long], Array[Double])] = {
val numTopics = k
@@ -587,6 +622,7 @@ class DistributedLDAModel private[clustering] (
* - This excludes the prior; for that, use [[logPrior]].
* - Even with [[logPrior]], this is NOT the same as the data log likelihood given the
* hyperparameters.
+ * @since 1.3.0
*/
lazy val logLikelihood: Double = {
// TODO: generalize this for asymmetric (non-scalar) alpha
@@ -612,7 +648,8 @@ class DistributedLDAModel private[clustering] (
/**
* Log probability of the current parameter estimate:
- * log P(topics, topic distributions for docs | alpha, eta)
+ * log P(topics, topic distributions for docs | alpha, eta)
+ * @since 1.3.0
*/
lazy val logPrior: Double = {
// TODO: generalize this for asymmetric (non-scalar) alpha
@@ -644,6 +681,7 @@ class DistributedLDAModel private[clustering] (
* ("theta_doc").
*
* @return RDD of (document ID, topic distribution) pairs
+ * @since 1.3.0
*/
def topicDistributions: RDD[(Long, Vector)] = {
graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) =>
@@ -651,7 +689,10 @@ class DistributedLDAModel private[clustering] (
}
}
- /** Java-friendly version of [[topicDistributions]] */
+ /**
+ * Java-friendly version of [[topicDistributions]]
+ * @since 1.4.1
+ */
def javaTopicDistributions: JavaPairRDD[java.lang.Long, Vector] = {
JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, Vector)]])
}
@@ -659,6 +700,7 @@ class DistributedLDAModel private[clustering] (
/**
* For each document, return the top k weighted topics for that document and their weights.
* @return RDD of (doc ID, topic indices, topic weights)
+ * @since 1.5.0
*/
def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = {
graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) =>
@@ -673,7 +715,10 @@ class DistributedLDAModel private[clustering] (
}
}
- /** Java-friendly version of [[topTopicsPerDocument]] */
+ /**
+ * Java-friendly version of [[topTopicsPerDocument]]
+ * @since 1.5.0
+ */
def javaTopTopicsPerDocument(k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[Double])] = {
val topics = topTopicsPerDocument(k)
topics.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[Double])]].toJavaRDD()
@@ -684,6 +729,10 @@ class DistributedLDAModel private[clustering] (
override protected def formatVersion = "1.0"
+ /**
+ * Java-friendly version of [[topicDistributions]]
+ * @since 1.5.0
+ */
override def save(sc: SparkContext, path: String): Unit = {
DistributedLDAModel.SaveLoadV1_0.save(
sc, path, graph, globalTopicTotals, k, vocabSize, docConcentration, topicConcentration,
@@ -784,6 +833,9 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
}
+ /**
+ * @since 1.5.0
+ */
override def load(sc: SparkContext, path: String): DistributedLDAModel = {
val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path)
implicit val formats = DefaultFormats
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index a0008f9c99..360241c808 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -35,6 +35,7 @@ import org.apache.spark.rdd.RDD
*
* An LDAOptimizer specifies which optimization/learning/inference algorithm to use, and it can
* hold optimizer-specific parameters for users to set.
+ * @since 1.4.0
*/
@DeveloperApi
sealed trait LDAOptimizer {
@@ -73,7 +74,7 @@ sealed trait LDAOptimizer {
* - Paper which clearly explains several algorithms, including EM:
* Asuncion, Welling, Smyth, and Teh.
* "On Smoothing and Inference for Topic Models." UAI, 2009.
- *
+ * @since 1.4.0
*/
@DeveloperApi
final class EMLDAOptimizer extends LDAOptimizer {
@@ -225,6 +226,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
*
* Original Online LDA paper:
* Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
+ * @since 1.4.0
*/
@DeveloperApi
final class OnlineLDAOptimizer extends LDAOptimizer {
@@ -274,6 +276,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
/**
* A (positive) learning parameter that downweights early iterations. Larger values make early
* iterations count less.
+ * @since 1.4.0
*/
def getTau0: Double = this.tau0
@@ -281,6 +284,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
* A (positive) learning parameter that downweights early iterations. Larger values make early
* iterations count less.
* Default: 1024, following the original Online LDA paper.
+ * @since 1.4.0
*/
def setTau0(tau0: Double): this.type = {
require(tau0 > 0, s"LDA tau0 must be positive, but was set to $tau0")
@@ -290,6 +294,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Learning rate: exponential decay rate
+ * @since 1.4.0
*/
def getKappa: Double = this.kappa
@@ -297,6 +302,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
* Learning rate: exponential decay rate---should be between
* (0.5, 1.0] to guarantee asymptotic convergence.
* Default: 0.51, based on the original Online LDA paper.
+ * @since 1.4.0
*/
def setKappa(kappa: Double): this.type = {
require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa")
@@ -306,6 +312,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Mini-batch fraction, which sets the fraction of document sampled and used in each iteration
+ * @since 1.4.0
*/
def getMiniBatchFraction: Double = this.miniBatchFraction
@@ -318,6 +325,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
* maxIterations * miniBatchFraction >= 1.
*
* Default: 0.05, i.e., 5% of total documents.
+ * @since 1.4.0
*/
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
@@ -329,6 +337,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Optimize alpha, indicates whether alpha (Dirichlet parameter for document-topic distribution)
* will be optimized during training.
+ * @since 1.5.0
*/
def getOptimzeAlpha: Boolean = this.optimizeAlpha
@@ -336,6 +345,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
* Sets whether to optimize alpha parameter during training.
*
* Default: false
+ * @since 1.5.0
*/
def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = {
this.optimizeAlpha = optimizeAlpha
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 407e43a024..b4733ca975 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -39,12 +39,16 @@ import org.apache.spark.{Logging, SparkContext, SparkException}
*
* @param k number of clusters
* @param assignments an RDD of clustering [[PowerIterationClustering#Assignment]]s
+ * @since 1.3.0
*/
@Experimental
class PowerIterationClusteringModel(
val k: Int,
val assignments: RDD[PowerIterationClustering.Assignment]) extends Saveable with Serializable {
+ /**
+ * @since 1.4.0
+ */
override def save(sc: SparkContext, path: String): Unit = {
PowerIterationClusteringModel.SaveLoadV1_0.save(sc, this, path)
}
@@ -52,6 +56,9 @@ class PowerIterationClusteringModel(
override protected def formatVersion: String = "1.0"
}
+/**
+ * @since 1.4.0
+ */
object PowerIterationClusteringModel extends Loader[PowerIterationClusteringModel] {
override def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path)
@@ -65,6 +72,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
private[clustering]
val thisClassName = "org.apache.spark.mllib.clustering.PowerIterationClusteringModel"
+ /**
+ * @since 1.4.0
+ */
def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
@@ -77,6 +87,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
dataRDD.write.parquet(Loader.dataPath(path))
}
+ /**
+ * @since 1.4.0
+ */
def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
implicit val formats = DefaultFormats
val sqlContext = new SQLContext(sc)
@@ -120,13 +133,16 @@ class PowerIterationClustering private[clustering] (
import org.apache.spark.mllib.clustering.PowerIterationClustering._
- /** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100,
- * initMode: "random"}.
+ /**
+ * Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100,
+ * initMode: "random"}.
+ * @since 1.3.0
*/
def this() = this(k = 2, maxIterations = 100, initMode = "random")
/**
* Set the number of clusters.
+ * @since 1.3.0
*/
def setK(k: Int): this.type = {
this.k = k
@@ -135,6 +151,7 @@ class PowerIterationClustering private[clustering] (
/**
* Set maximum number of iterations of the power iteration loop
+ * @since 1.3.0
*/
def setMaxIterations(maxIterations: Int): this.type = {
this.maxIterations = maxIterations
@@ -144,6 +161,7 @@ class PowerIterationClustering private[clustering] (
/**
* Set the initialization mode. This can be either "random" to use a random vector
* as vertex properties, or "degree" to use normalized sum similarities. Default: random.
+ * @since 1.3.0
*/
def setInitializationMode(mode: String): this.type = {
this.initMode = mode match {
@@ -164,6 +182,7 @@ class PowerIterationClustering private[clustering] (
* assume s,,ij,, = 0.0.
*
* @return a [[PowerIterationClusteringModel]] that contains the clustering result
+ * @since 1.5.0
*/
def run(graph: Graph[Double, Double]): PowerIterationClusteringModel = {
val w = normalize(graph)
@@ -185,6 +204,7 @@ class PowerIterationClustering private[clustering] (
* assume s,,ij,, = 0.0.
*
* @return a [[PowerIterationClusteringModel]] that contains the clustering result
+ * @since 1.3.0
*/
def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = {
val w = normalize(similarities)
@@ -197,6 +217,7 @@ class PowerIterationClustering private[clustering] (
/**
* A Java-friendly version of [[PowerIterationClustering.run]].
+ * @since 1.3.0
*/
def run(similarities: JavaRDD[(java.lang.Long, java.lang.Long, java.lang.Double)])
: PowerIterationClusteringModel = {
@@ -221,6 +242,9 @@ class PowerIterationClustering private[clustering] (
}
}
+/**
+ * @since 1.3.0
+ */
@Experimental
object PowerIterationClustering extends Logging {
@@ -229,6 +253,7 @@ object PowerIterationClustering extends Logging {
* Cluster assignment.
* @param id node id
* @param cluster assigned cluster id
+ * @since 1.3.0
*/
@Experimental
case class Assignment(id: Long, cluster: Int)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index d9b34cec64..a915804b02 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -63,6 +63,7 @@ import org.apache.spark.util.random.XORShiftRandom
* such that at time t + h the discount applied to the data from t is 0.5.
* The definition remains the same whether the time unit is given
* as batches or points.
+ * @since 1.2.0
*
*/
@Experimental
@@ -70,7 +71,10 @@ class StreamingKMeansModel(
override val clusterCenters: Array[Vector],
val clusterWeights: Array[Double]) extends KMeansModel(clusterCenters) with Logging {
- /** Perform a k-means update on a batch of data. */
+ /**
+ * Perform a k-means update on a batch of data.
+ * @since 1.2.0
+ */
def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = {
// find nearest cluster to each point
@@ -82,6 +86,7 @@ class StreamingKMeansModel(
(p1._1, p1._2 + p2._2)
}
val dim = clusterCenters(0).size
+
val pointStats: Array[(Int, (Vector, Long))] = closest
.aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs)
.collect()
@@ -161,6 +166,7 @@ class StreamingKMeansModel(
* .setRandomCenters(5, 100.0)
* .trainOn(DStream)
* }}}
+ * @since 1.2.0
*/
@Experimental
class StreamingKMeans(
@@ -168,23 +174,33 @@ class StreamingKMeans(
var decayFactor: Double,
var timeUnit: String) extends Logging with Serializable {
+ /** @since 1.2.0 */
def this() = this(2, 1.0, StreamingKMeans.BATCHES)
protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
- /** Set the number of clusters. */
+ /**
+ * Set the number of clusters.
+ * @since 1.2.0
+ */
def setK(k: Int): this.type = {
this.k = k
this
}
- /** Set the decay factor directly (for forgetful algorithms). */
+ /**
+ * Set the decay factor directly (for forgetful algorithms).
+ * @since 1.2.0
+ */
def setDecayFactor(a: Double): this.type = {
this.decayFactor = a
this
}
- /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */
+ /**
+ * Set the half life and time unit ("batches" or "points") for forgetful algorithms.
+ * @since 1.2.0
+ */
def setHalfLife(halfLife: Double, timeUnit: String): this.type = {
if (timeUnit != StreamingKMeans.BATCHES && timeUnit != StreamingKMeans.POINTS) {
throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit)
@@ -195,7 +211,10 @@ class StreamingKMeans(
this
}
- /** Specify initial centers directly. */
+ /**
+ * Specify initial centers directly.
+ * @since 1.2.0
+ */
def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = {
model = new StreamingKMeansModel(centers, weights)
this
@@ -207,6 +226,7 @@ class StreamingKMeans(
* @param dim Number of dimensions
* @param weight Weight for each center
* @param seed Random seed
+ * @since 1.2.0
*/
def setRandomCenters(dim: Int, weight: Double, seed: Long = Utils.random.nextLong): this.type = {
val random = new XORShiftRandom(seed)
@@ -216,7 +236,10 @@ class StreamingKMeans(
this
}
- /** Return the latest model. */
+ /**
+ * Return the latest model.
+ * @since 1.2.0
+ */
def latestModel(): StreamingKMeansModel = {
model
}
@@ -228,6 +251,7 @@ class StreamingKMeans(
* and updates the model using each batch of data from the stream.
*
* @param data DStream containing vector data
+ * @since 1.2.0
*/
def trainOn(data: DStream[Vector]) {
assertInitialized()
@@ -236,7 +260,10 @@ class StreamingKMeans(
}
}
- /** Java-friendly version of `trainOn`. */
+ /**
+ * Java-friendly version of `trainOn`.
+ * @since 1.4.0
+ */
def trainOn(data: JavaDStream[Vector]): Unit = trainOn(data.dstream)
/**
@@ -244,13 +271,17 @@ class StreamingKMeans(
*
* @param data DStream containing vector data
* @return DStream containing predictions
+ * @since 1.2.0
*/
def predictOn(data: DStream[Vector]): DStream[Int] = {
assertInitialized()
data.map(model.predict)
}
- /** Java-friendly version of `predictOn`. */
+ /**
+ * Java-friendly version of `predictOn`.
+ * @since 1.4.0
+ */
def predictOn(data: JavaDStream[Vector]): JavaDStream[java.lang.Integer] = {
JavaDStream.fromDStream(predictOn(data.dstream).asInstanceOf[DStream[java.lang.Integer]])
}
@@ -261,13 +292,17 @@ class StreamingKMeans(
* @param data DStream containing (key, feature vector) pairs
* @tparam K key type
* @return DStream containing the input keys and the predictions as values
+ * @since 1.2.0
*/
def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = {
assertInitialized()
data.mapValues(model.predict)
}
- /** Java-friendly version of `predictOnValues`. */
+ /**
+ * Java-friendly version of `predictOnValues`.
+ * @since 1.4.0
+ */
def predictOnValues[K](
data: JavaPairDStream[K, Vector]): JavaPairDStream[K, java.lang.Integer] = {
implicit val tag = fakeClassTag[K]