From 5d4f6abec4e371093e01c084656173e9cfabf29b Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 26 Oct 2015 21:28:18 -0700 Subject: [SPARK-10271][PYSPARK][MLLIB] Added @since tags to pyspark.mllib.clustering Duplicated the since decorator from pyspark.sql into pyspark (also tweaked to handle functions without docstrings). Added since to methods + "versionadded::" to classes (derived from the git file history in pyspark). Author: noelsmith Closes #8627 from noel-smith/SPARK-10271-since-mllib-clustering. --- python/pyspark/mllib/clustering.py | 69 +++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) (limited to 'python/pyspark/mllib') diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 6964a45db2..c451df17cf 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,7 +28,7 @@ from numpy import array, random, tile from collections import namedtuple -from pyspark import SparkContext +from pyspark import SparkContext, since from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector @@ -96,21 +96,26 @@ class KMeansModel(Saveable, Loader): ... initialModel = KMeansModel([(-1000.0,-1000.0),(5.0,5.0),(1000.0,1000.0)])) >>> model.clusterCenters [array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])] + + .. versionadded:: 0.9.0 """ def __init__(self, centers): self.centers = centers @property + @since('1.0.0') def clusterCenters(self): """Get the cluster centers, represented as a list of NumPy arrays.""" return self.centers @property + @since('1.4.0') def k(self): """Total number of clusters.""" return len(self.centers) + @since('0.9.0') def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 @@ -126,6 +131,7 @@ class KMeansModel(Saveable, Loader): best_distance = distance return best + @since('1.4.0') def computeCost(self, rdd): """ Return the K-means cost (sum of squared distances of points to @@ -135,20 +141,32 @@ class KMeansModel(Saveable, Loader): [_convert_to_vector(c) for c in self.centers]) return cost + @since('1.4.0') def save(self, sc, path): + """ + Save this model to the given path. + """ java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers]) java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers) java_model.save(sc._jsc.sc(), path) @classmethod + @since('1.4.0') def load(cls, sc, path): + """ + Load a model from the given path. + """ java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel.load(sc._jsc.sc(), path) return KMeansModel(_java2py(sc, java_model.clusterCenters())) class KMeans(object): + """ + .. versionadded:: 0.9.0 + """ @classmethod + @since('0.9.0') def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None): """Train a k-means clustering model.""" @@ -222,9 +240,12 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): True >>> labels[3]==labels[4] True + + .. versionadded:: 1.3.0 """ @property + @since('1.4.0') def weights(self): """ Weights for each Gaussian distribution in the mixture, where weights[i] is @@ -233,6 +254,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): return array(self.call("weights")) @property + @since('1.4.0') def gaussians(self): """ Array of MultivariateGaussian where gaussians[i] represents @@ -243,10 +265,12 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): for gaussian in zip(*self.call("gaussians"))] @property + @since('1.4.0') def k(self): """Number of gaussians in mixture.""" return len(self.weights) + @since('1.3.0') def predict(self, x): """ Find the cluster to which the points in 'x' has maximum membership @@ -262,6 +286,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): raise TypeError("x should be represented by an RDD, " "but got %s." % type(x)) + @since('1.3.0') def predictSoft(self, x): """ Find the membership of each point in 'x' to all mixture components. @@ -279,6 +304,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): "but got %s." % type(x)) @classmethod + @since('1.5.0') def load(cls, sc, path): """Load the GaussianMixtureModel from disk. @@ -302,8 +328,11 @@ class GaussianMixture(object): :param maxIterations: Number of iterations. Default to 100 :param seed: Random Seed :param initialModel: GaussianMixtureModel for initializing learning + + .. versionadded:: 1.3.0 """ @classmethod + @since('1.3.0') def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None): """Train a Gaussian Mixture clustering model.""" initialModelWeights = None @@ -358,15 +387,19 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): ... rmtree(path) ... except OSError: ... pass + + .. versionadded:: 1.5.0 """ @property + @since('1.5.0') def k(self): """ Returns the number of clusters. """ return self.call("k") + @since('1.5.0') def assignments(self): """ Returns the cluster assignments of this model. @@ -375,7 +408,11 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): lambda x: (PowerIterationClustering.Assignment(*x))) @classmethod + @since('1.5.0') def load(cls, sc, path): + """ + Load a model from the given path. + """ model = cls._load_java(sc, path) wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model) return PowerIterationClusteringModel(wrapper) @@ -390,9 +427,12 @@ class PowerIterationClustering(object): From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data. + + .. versionadded:: 1.5.0 """ @classmethod + @since('1.5.0') def train(cls, rdd, k, maxIterations=100, initMode="random"): """ :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the @@ -415,6 +455,8 @@ class PowerIterationClustering(object): class Assignment(namedtuple("Assignment", ["id", "cluster"])): """ Represents an (id, cluster) tuple. + + .. versionadded:: 1.5.0 """ @@ -474,17 +516,21 @@ class StreamingKMeansModel(KMeansModel): 0 >>> stkm.predict([1.5, 1.5]) 1 + + .. versionadded:: 1.5.0 """ def __init__(self, clusterCenters, clusterWeights): super(StreamingKMeansModel, self).__init__(centers=clusterCenters) self._clusterWeights = list(clusterWeights) @property + @since('1.5.0') def clusterWeights(self): """Return the cluster weights.""" return self._clusterWeights @ignore_unicode_prefix + @since('1.5.0') def update(self, data, decayFactor, timeUnit): """Update the centroids, according to data @@ -523,6 +569,8 @@ class StreamingKMeans(object): :param decayFactor: float, forgetfulness of the previous centroids. :param timeUnit: can be "batches" or "points". If points, then the decayfactor is raised to the power of no. of new points. + + .. versionadded:: 1.5.0 """ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): self._k = k @@ -533,6 +581,7 @@ class StreamingKMeans(object): self._timeUnit = timeUnit self._model = None + @since('1.5.0') def latestModel(self): """Return the latest model""" return self._model @@ -547,16 +596,19 @@ class StreamingKMeans(object): "Expected dstream to be of type DStream, " "got type %s" % type(dstream)) + @since('1.5.0') def setK(self, k): """Set number of clusters.""" self._k = k return self + @since('1.5.0') def setDecayFactor(self, decayFactor): """Set decay factor.""" self._decayFactor = decayFactor return self + @since('1.5.0') def setHalfLife(self, halfLife, timeUnit): """ Set number of batches after which the centroids of that @@ -566,6 +618,7 @@ class StreamingKMeans(object): self._decayFactor = exp(log(0.5) / halfLife) return self + @since('1.5.0') def setInitialCenters(self, centers, weights): """ Set initial centers. Should be set before calling trainOn. @@ -573,6 +626,7 @@ class StreamingKMeans(object): self._model = StreamingKMeansModel(centers, weights) return self + @since('1.5.0') def setRandomCenters(self, dim, weight, seed): """ Set the initial centres to be random samples from @@ -584,6 +638,7 @@ class StreamingKMeans(object): self._model = StreamingKMeansModel(clusterCenters, clusterWeights) return self + @since('1.5.0') def trainOn(self, dstream): """Train the model on the incoming dstream.""" self._validate(dstream) @@ -593,6 +648,7 @@ class StreamingKMeans(object): dstream.foreachRDD(update) + @since('1.5.0') def predictOn(self, dstream): """ Make predictions on a dstream. @@ -601,6 +657,7 @@ class StreamingKMeans(object): self._validate(dstream) return dstream.map(lambda x: self._model.predict(x)) + @since('1.5.0') def predictOnValues(self, dstream): """ Make predictions on a keyed dstream. @@ -649,16 +706,21 @@ class LDAModel(JavaModelWrapper): ... rmtree(path) ... except OSError: ... pass + + .. versionadded:: 1.5.0 """ + @since('1.5.0') def topicsMatrix(self): """Inferred topics, where each topic is represented by a distribution over terms.""" return self.call("topicsMatrix").toArray() + @since('1.5.0') def vocabSize(self): """Vocabulary size (number of terms or terms in the vocabulary)""" return self.call("vocabSize") + @since('1.5.0') def save(self, sc, path): """Save the LDAModel on to disk. @@ -672,6 +734,7 @@ class LDAModel(JavaModelWrapper): self._java_model.save(sc._jsc.sc(), path) @classmethod + @since('1.5.0') def load(cls, sc, path): """Load the LDAModel from disk. @@ -688,8 +751,12 @@ class LDAModel(JavaModelWrapper): class LDA(object): + """ + .. versionadded:: 1.5.0 + """ @classmethod + @since('1.5.0') def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0, topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"): """Train a LDA model. -- cgit v1.2.3