aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala31
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala13
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala6
-rw-r--r--python/pyspark/mllib/clustering.py265
6 files changed, 228 insertions, 103 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index 7b203e2f40..88dbfe3fcc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -45,10 +45,10 @@ import org.apache.spark.util.Utils
* This is due to high-dimensional data (a) making it difficult to cluster at all (based
* on statistical/theoretical arguments) and (b) numerical issues with Gaussian distributions.
*
- * @param k The number of independent Gaussians in the mixture model
- * @param convergenceTol The maximum change in log-likelihood at which convergence
- * is considered to have occurred.
- * @param maxIterations The maximum number of iterations to perform
+ * @param k Number of independent Gaussians in the mixture model.
+ * @param convergenceTol Maximum change in log-likelihood at which convergence
+ * is considered to have occurred.
+ * @param maxIterations Maximum number of iterations allowed.
*/
@Since("1.3.0")
class GaussianMixture private (
@@ -108,7 +108,7 @@ class GaussianMixture private (
def getK: Int = k
/**
- * Set the maximum number of iterations to run. Default: 100
+ * Set the maximum number of iterations allowed. Default: 100
*/
@Since("1.3.0")
def setMaxIterations(maxIterations: Int): this.type = {
@@ -117,7 +117,7 @@ class GaussianMixture private (
}
/**
- * Return the maximum number of iterations to run
+ * Return the maximum number of iterations allowed
*/
@Since("1.3.0")
def getMaxIterations: Int = maxIterations
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index ca11ede4cc..901164a391 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -70,13 +70,13 @@ class KMeans private (
}
/**
- * Maximum number of iterations to run.
+ * Maximum number of iterations allowed.
*/
@Since("1.4.0")
def getMaxIterations: Int = maxIterations
/**
- * Set maximum number of iterations to run. Default: 20.
+ * Set maximum number of iterations allowed. Default: 20.
*/
@Since("0.8.0")
def setMaxIterations(maxIterations: Int): this.type = {
@@ -482,12 +482,15 @@ object KMeans {
/**
* Trains a k-means model using the given set of parameters.
*
- * @param data training points stored as `RDD[Vector]`
- * @param k number of clusters
- * @param maxIterations max number of iterations
- * @param runs number of parallel runs, defaults to 1. The best model is returned.
- * @param initializationMode initialization model, either "random" or "k-means||" (default).
- * @param seed random seed value for cluster initialization
+ * @param data Training points as an `RDD` of `Vector` types.
+ * @param k Number of clusters to create.
+ * @param maxIterations Maximum number of iterations allowed.
+ * @param runs Number of runs to execute in parallel. The best model according to the cost
+ * function will be returned. (default: 1)
+ * @param initializationMode The initialization algorithm. This can either be "random" or
+ * "k-means||". (default: "k-means||")
+ * @param seed Random seed for cluster initialization. Default is to generate seed based
+ * on system time.
*/
@Since("1.3.0")
def train(
@@ -508,11 +511,13 @@ object KMeans {
/**
* Trains a k-means model using the given set of parameters.
*
- * @param data training points stored as `RDD[Vector]`
- * @param k number of clusters
- * @param maxIterations max number of iterations
- * @param runs number of parallel runs, defaults to 1. The best model is returned.
- * @param initializationMode initialization model, either "random" or "k-means||" (default).
+ * @param data Training points as an `RDD` of `Vector` types.
+ * @param k Number of clusters to create.
+ * @param maxIterations Maximum number of iterations allowed.
+ * @param runs Number of runs to execute in parallel. The best model according to the cost
+ * function will be returned. (default: 1)
+ * @param initializationMode The initialization algorithm. This can either be "random" or
+ * "k-means||". (default: "k-means||")
*/
@Since("0.8.0")
def train(
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
index eb802a365e..81566b4779 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
@@ -61,14 +61,13 @@ class LDA private (
ldaOptimizer = new EMLDAOptimizer)
/**
- * Number of topics to infer. I.e., the number of soft cluster centers.
- *
+ * Number of topics to infer, i.e., the number of soft cluster centers.
*/
@Since("1.3.0")
def getK: Int = k
/**
- * Number of topics to infer. I.e., the number of soft cluster centers.
+ * Set the number of topics to infer, i.e., the number of soft cluster centers.
* (default = 10)
*/
@Since("1.3.0")
@@ -222,13 +221,13 @@ class LDA private (
def setBeta(beta: Double): this.type = setTopicConcentration(beta)
/**
- * Maximum number of iterations for learning.
+ * Maximum number of iterations allowed.
*/
@Since("1.3.0")
def getMaxIterations: Int = maxIterations
/**
- * Maximum number of iterations for learning.
+ * Set the maximum number of iterations allowed.
* (default = 20)
*/
@Since("1.3.0")
@@ -238,13 +237,13 @@ class LDA private (
}
/**
- * Random seed
+ * Random seed for cluster initialization.
*/
@Since("1.3.0")
def getSeed: Long = seed
/**
- * Random seed
+ * Set the random seed for cluster initialization.
*/
@Since("1.3.0")
def setSeed(seed: Long): this.type = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 2ab0920b06..1ab7cb393b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -111,7 +111,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
*
* @param k Number of clusters.
* @param maxIterations Maximum number of iterations of the PIC algorithm.
- * @param initMode Initialization mode.
+ * @param initMode Set the initialization mode. This can be either "random" to use a random vector
+ * as vertex properties, or "degree" to use normalized sum similarities.
+ * Default: random.
*
* @see [[http://en.wikipedia.org/wiki/Spectral_clustering Spectral clustering (Wikipedia)]]
*/
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index 79d217e183..d99b89dc49 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -183,7 +183,7 @@ class StreamingKMeans @Since("1.2.0") (
}
/**
- * Set the decay factor directly (for forgetful algorithms).
+ * Set the forgetfulness of the previous centroids.
*/
@Since("1.2.0")
def setDecayFactor(a: Double): this.type = {
@@ -192,7 +192,9 @@ class StreamingKMeans @Since("1.2.0") (
}
/**
- * Set the half life and time unit ("batches" or "points") for forgetful algorithms.
+ * Set the half life and time unit ("batches" or "points"). If points, then the decay factor
+ * is raised to the power of number of new points and if batches, then decay factor will be
+ * used as is.
*/
@Since("1.2.0")
def setHalfLife(halfLife: Double, timeUnit: String): this.type = {
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 4e9eb96fd9..ad04e46e88 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -88,8 +88,11 @@ class BisectingKMeansModel(JavaModelWrapper):
Find the cluster that each of the points belongs to in this
model.
- :param x: the point (or RDD of points) to determine
- compute the clusters for.
+ :param x:
+ A data point (or RDD of points) to determine cluster index.
+ :return:
+ Predicted cluster index or an RDD of predicted cluster indices
+ if the input is an RDD.
"""
if isinstance(x, RDD):
vecs = x.map(_convert_to_vector)
@@ -105,7 +108,8 @@ class BisectingKMeansModel(JavaModelWrapper):
points to their nearest center) for this model on the given
data. If provided with an RDD of points returns the sum.
- :param point: the point or RDD of points to compute the cost(s).
+ :param point:
+ A data point (or RDD of points) to compute the cost(s).
"""
if isinstance(x, RDD):
vecs = x.map(_convert_to_vector)
@@ -143,17 +147,23 @@ class BisectingKMeans(object):
"""
Runs the bisecting k-means algorithm return the model.
- :param rdd: input RDD to be trained on
- :param k: The desired number of leaf clusters (default: 4).
- The actual number could be smaller if there are no divisible
- leaf clusters.
- :param maxIterations: the max number of k-means iterations to
- split clusters (default: 20)
- :param minDivisibleClusterSize: the minimum number of points
- (if >= 1.0) or the minimum proportion of points (if < 1.0)
- of a divisible cluster (default: 1)
- :param seed: a random seed (default: -1888008604 from
- classOf[BisectingKMeans].getName.##)
+ :param rdd:
+ Training points as an `RDD` of `Vector` or convertible
+ sequence types.
+ :param k:
+ The desired number of leaf clusters. The actual number could
+ be smaller if there are no divisible leaf clusters.
+ (default: 4)
+ :param maxIterations:
+ Maximum number of iterations allowed to split clusters.
+ (default: 20)
+ :param minDivisibleClusterSize:
+ Minimum number of points (if >= 1.0) or the minimum proportion
+ of points (if < 1.0) of a divisible cluster.
+ (default: 1)
+ :param seed:
+ Random seed value for cluster initialization.
+ (default: -1888008604 from classOf[BisectingKMeans].getName.##)
"""
java_model = callMLlibFunc(
"trainBisectingKMeans", rdd.map(_convert_to_vector),
@@ -239,8 +249,11 @@ class KMeansModel(Saveable, Loader):
Find the cluster that each of the points belongs to in this
model.
- :param x: the point (or RDD of points) to determine
- compute the clusters for.
+ :param x:
+ A data point (or RDD of points) to determine cluster index.
+ :return:
+ Predicted cluster index or an RDD of predicted cluster indices
+ if the input is an RDD.
"""
best = 0
best_distance = float("inf")
@@ -262,7 +275,8 @@ class KMeansModel(Saveable, Loader):
their nearest center) for this model on the given
data.
- :param point: the RDD of points to compute the cost on.
+ :param rdd:
+ The RDD of points to compute the cost on.
"""
cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
[_convert_to_vector(c) for c in self.centers])
@@ -296,7 +310,44 @@ class KMeans(object):
@since('0.9.0')
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||",
seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None):
- """Train a k-means clustering model."""
+ """
+ Train a k-means clustering model.
+
+ :param rdd:
+ Training points as an `RDD` of `Vector` or convertible
+ sequence types.
+ :param k:
+ Number of clusters to create.
+ :param maxIterations:
+ Maximum number of iterations allowed.
+ (default: 100)
+ :param runs:
+ Number of runs to execute in parallel. The best model according
+ to the cost function will be returned (deprecated in 1.6.0).
+ (default: 1)
+ :param initializationMode:
+ The initialization algorithm. This can be either "random" or
+ "k-means||".
+ (default: "k-means||")
+ :param seed:
+ Random seed value for cluster initialization. Set as None to
+ generate seed based on system time.
+ (default: None)
+ :param initializationSteps:
+ Number of steps for the k-means|| initialization mode.
+ This is an advanced setting -- the default of 5 is almost
+ always enough.
+ (default: 5)
+ :param epsilon:
+ Distance threshold within which a center will be considered to
+ have converged. If all centers move less than this Euclidean
+ distance, iterations are stopped.
+ (default: 1e-4)
+ :param initialModel:
+ Initial cluster centers can be provided as a KMeansModel object
+ rather than using the random or k-means|| initializationModel.
+ (default: None)
+ """
if runs != 1:
warnings.warn(
"Support for runs is deprecated in 1.6.0. This param will have no effect in 2.0.0.")
@@ -415,8 +466,11 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
Find the cluster to which the point 'x' or each point in RDD 'x'
has maximum membership in this model.
- :param x: vector or RDD of vector represents data points.
- :return: cluster label or RDD of cluster labels.
+ :param x:
+ A feature vector or an RDD of vectors representing data points.
+ :return:
+ Predicted cluster label or an RDD of predicted cluster labels
+ if the input is an RDD.
"""
if isinstance(x, RDD):
cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
@@ -430,9 +484,11 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
"""
Find the membership of point 'x' or each point in RDD 'x' to all mixture components.
- :param x: vector or RDD of vector represents data points.
- :return: the membership value to all mixture components for vector 'x'
- or each vector in RDD 'x'.
+ :param x:
+ A feature vector or an RDD of vectors representing data points.
+ :return:
+ The membership value to all mixture components for vector 'x'
+ or each vector in RDD 'x'.
"""
if isinstance(x, RDD):
means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
@@ -447,8 +503,10 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
def load(cls, sc, path):
"""Load the GaussianMixtureModel from disk.
- :param sc: SparkContext
- :param path: str, path to where the model is stored.
+ :param sc:
+ SparkContext.
+ :param path:
+ Path to where the model is stored.
"""
model = cls._load_java(sc, path)
wrapper = sc._jvm.GaussianMixtureModelWrapper(model)
@@ -461,19 +519,35 @@ class GaussianMixture(object):
Learning algorithm for Gaussian Mixtures using the expectation-maximization algorithm.
- :param data: RDD of data points
- :param k: Number of components
- :param convergenceTol: Threshold value to check the convergence criteria. Defaults to 1e-3
- :param maxIterations: Number of iterations. Default to 100
- :param seed: Random Seed
- :param initialModel: GaussianMixtureModel for initializing learning
-
.. versionadded:: 1.3.0
"""
@classmethod
@since('1.3.0')
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
- """Train a Gaussian Mixture clustering model."""
+ """
+ Train a Gaussian Mixture clustering model.
+
+ :param rdd:
+ Training points as an `RDD` of `Vector` or convertible
+ sequence types.
+ :param k:
+ Number of independent Gaussians in the mixture model.
+ :param convergenceTol:
+ Maximum change in log-likelihood at which convergence is
+ considered to have occurred.
+ (default: 1e-3)
+ :param maxIterations:
+ Maximum number of iterations allowed.
+ (default: 100)
+ :param seed:
+ Random seed for initial Gaussian distribution. Set as None to
+ generate seed based on system time.
+ (default: None)
+ :param initialModel:
+ Initial GMM starting point, bypassing the random
+ initialization.
+ (default: None)
+ """
initialModelWeights = None
initialModelMu = None
initialModelSigma = None
@@ -574,18 +648,24 @@ class PowerIterationClustering(object):
@since('1.5.0')
def train(cls, rdd, k, maxIterations=100, initMode="random"):
"""
- :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
- affinity matrix, which is the matrix A in the PIC paper.
- The similarity s,,ij,, must be nonnegative.
- This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
- For any (i, j) with nonzero similarity, there should be
- either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
- Tuples with i = j are ignored, because we assume
- s,,ij,, = 0.0.
- :param k: Number of clusters.
- :param maxIterations: Maximum number of iterations of the
- PIC algorithm.
- :param initMode: Initialization mode.
+ :param rdd:
+ An RDD of (i, j, s\ :sub:`ij`\) tuples representing the
+ affinity matrix, which is the matrix A in the PIC paper. The
+ similarity s\ :sub:`ij`\ must be nonnegative. This is a symmetric
+ matrix and hence s\ :sub:`ij`\ = s\ :sub:`ji`\ For any (i, j) with
+ nonzero similarity, there should be either (i, j, s\ :sub:`ij`\) or
+ (j, i, s\ :sub:`ji`\) in the input. Tuples with i = j are ignored,
+ because it is assumed s\ :sub:`ij`\ = 0.0.
+ :param k:
+ Number of clusters.
+ :param maxIterations:
+ Maximum number of iterations of the PIC algorithm.
+ (default: 100)
+ :param initMode:
+ Initialization mode. This can be either "random" to use
+ a random vector as vertex properties, or "degree" to use
+ normalized sum similarities.
+ (default: "random")
"""
model = callMLlibFunc("trainPowerIterationClusteringModel",
rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
@@ -625,8 +705,10 @@ class StreamingKMeansModel(KMeansModel):
and new data. If it set to zero, the old centroids are completely
forgotten.
- :param clusterCenters: Initial cluster centers.
- :param clusterWeights: List of weights assigned to each cluster.
+ :param clusterCenters:
+ Initial cluster centers.
+ :param clusterWeights:
+ List of weights assigned to each cluster.
>>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
>>> initWeights = [1.0, 1.0]
@@ -673,11 +755,14 @@ class StreamingKMeansModel(KMeansModel):
def update(self, data, decayFactor, timeUnit):
"""Update the centroids, according to data
- :param data: Should be a RDD that represents the new data.
- :param decayFactor: forgetfulness of the previous centroids.
- :param timeUnit: Can be "batches" or "points". If points, then the
- decay factor is raised to the power of number of new
- points and if batches, it is used as it is.
+ :param data:
+ RDD with new data for the model update.
+ :param decayFactor:
+ Forgetfulness of the previous centroids.
+ :param timeUnit:
+ Can be "batches" or "points". If points, then the decay factor
+ is raised to the power of number of new points and if batches,
+ then decay factor will be used as is.
"""
if not isinstance(data, RDD):
raise TypeError("Data should be of an RDD, got %s." % type(data))
@@ -704,10 +789,17 @@ class StreamingKMeans(object):
More details on how the centroids are updated are provided under the
docs of StreamingKMeansModel.
- :param k: int, number of clusters
- :param decayFactor: float, forgetfulness of the previous centroids.
- :param timeUnit: can be "batches" or "points". If points, then the
- decayfactor is raised to the power of no. of new points.
+ :param k:
+ Number of clusters.
+ (default: 2)
+ :param decayFactor:
+ Forgetfulness of the previous centroids.
+ (default: 1.0)
+ :param timeUnit:
+ Can be "batches" or "points". If points, then the decay factor is
+ raised to the power of number of new points and if batches, then
+ decay factor will be used as is.
+ (default: "batches")
.. versionadded:: 1.5.0
"""
@@ -870,11 +962,13 @@ class LDAModel(JavaModelWrapper, JavaSaveable, Loader):
WARNING: If vocabSize and k are large, this can return a large object!
- :param maxTermsPerTopic: Maximum number of terms to collect for each topic.
- (default: vocabulary size)
- :return: Array over topics. Each topic is represented as a pair of matching arrays:
- (term indices, term weights in topic).
- Each topic's terms are sorted in order of decreasing weight.
+ :param maxTermsPerTopic:
+ Maximum number of terms to collect for each topic.
+ (default: vocabulary size)
+ :return:
+ Array over topics. Each topic is represented as a pair of
+ matching arrays: (term indices, term weights in topic).
+ Each topic's terms are sorted in order of decreasing weight.
"""
if maxTermsPerTopic is None:
topics = self.call("describeTopics")
@@ -887,8 +981,10 @@ class LDAModel(JavaModelWrapper, JavaSaveable, Loader):
def load(cls, sc, path):
"""Load the LDAModel from disk.
- :param sc: SparkContext
- :param path: str, path to where the model is stored.
+ :param sc:
+ SparkContext.
+ :param path:
+ Path to where the model is stored.
"""
if not isinstance(sc, SparkContext):
raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
@@ -909,17 +1005,38 @@ class LDA(object):
topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"):
"""Train a LDA model.
- :param rdd: RDD of data points
- :param k: Number of clusters you want
- :param maxIterations: Number of iterations. Default to 20
- :param docConcentration: Concentration parameter (commonly named "alpha")
- for the prior placed on documents' distributions over topics ("theta").
- :param topicConcentration: Concentration parameter (commonly named "beta" or "eta")
- for the prior placed on topics' distributions over terms.
- :param seed: Random Seed
- :param checkpointInterval: Period (in iterations) between checkpoints.
- :param optimizer: LDAOptimizer used to perform the actual calculation.
- Currently "em", "online" are supported. Default to "em".
+ :param rdd:
+ RDD of documents, which are tuples of document IDs and term
+ (word) count vectors. The term count vectors are "bags of
+ words" with a fixed-size vocabulary (where the vocabulary size
+ is the length of the vector). Document IDs must be unique
+ and >= 0.
+ :param k:
+ Number of topics to infer, i.e., the number of soft cluster
+ centers.
+ (default: 10)
+ :param maxIterations:
+ Maximum number of iterations allowed.
+ (default: 20)
+ :param docConcentration:
+ Concentration parameter (commonly named "alpha") for the prior
+ placed on documents' distributions over topics ("theta").
+ (default: -1.0)
+ :param topicConcentration:
+ Concentration parameter (commonly named "beta" or "eta") for
+ the prior placed on topics' distributions over terms.
+ (default: -1.0)
+ :param seed:
+ Random seed for cluster initialization. Set as None to generate
+ seed based on system time.
+ (default: None)
+ :param checkpointInterval:
+ Period (in iterations) between checkpoints.
+ (default: 10)
+ :param optimizer:
+ LDAOptimizer used to perform the actual calculation. Currently
+ "em", "online" are supported.
+ (default: "em")
"""
model = callMLlibFunc("trainLDAModel", rdd, k, maxIterations,
docConcentration, topicConcentration, seed,