aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib
diff options
context:
space:
mode:
authornoelsmith <mail@noelsmith.com>2015-10-26 21:28:18 -0700
committerXiangrui Meng <meng@databricks.com>2015-10-26 21:28:18 -0700
commit5d4f6abec4e371093e01c084656173e9cfabf29b (patch)
tree4965b0267e74928e4c9e291ed60eaf943f9ed3e8 /python/pyspark/mllib
parent943d4fa204a827ca8ecc39d9cf04e86890ee9840 (diff)
downloadspark-5d4f6abec4e371093e01c084656173e9cfabf29b.tar.gz
spark-5d4f6abec4e371093e01c084656173e9cfabf29b.tar.bz2
spark-5d4f6abec4e371093e01c084656173e9cfabf29b.zip
[SPARK-10271][PYSPARK][MLLIB] Added @since tags to pyspark.mllib.clustering
Duplicated the since decorator from pyspark.sql into pyspark (also tweaked to handle functions without docstrings). Added since to methods + "versionadded::" to classes (derived from the git file history in pyspark). Author: noelsmith <mail@noelsmith.com> Closes #8627 from noel-smith/SPARK-10271-since-mllib-clustering.
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r--python/pyspark/mllib/clustering.py69
1 files changed, 68 insertions, 1 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 6964a45db2..c451df17cf 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -28,7 +28,7 @@ from numpy import array, random, tile
from collections import namedtuple
-from pyspark import SparkContext
+from pyspark import SparkContext, since
from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
@@ -96,21 +96,26 @@ class KMeansModel(Saveable, Loader):
... initialModel = KMeansModel([(-1000.0,-1000.0),(5.0,5.0),(1000.0,1000.0)]))
>>> model.clusterCenters
[array([-1000., -1000.]), array([ 5., 5.]), array([ 1000., 1000.])]
+
+ .. versionadded:: 0.9.0
"""
def __init__(self, centers):
self.centers = centers
@property
+ @since('1.0.0')
def clusterCenters(self):
"""Get the cluster centers, represented as a list of NumPy arrays."""
return self.centers
@property
+ @since('1.4.0')
def k(self):
"""Total number of clusters."""
return len(self.centers)
+ @since('0.9.0')
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
@@ -126,6 +131,7 @@ class KMeansModel(Saveable, Loader):
best_distance = distance
return best
+ @since('1.4.0')
def computeCost(self, rdd):
"""
Return the K-means cost (sum of squared distances of points to
@@ -135,20 +141,32 @@ class KMeansModel(Saveable, Loader):
[_convert_to_vector(c) for c in self.centers])
return cost
+ @since('1.4.0')
def save(self, sc, path):
+ """
+ Save this model to the given path.
+ """
java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers])
java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers)
java_model.save(sc._jsc.sc(), path)
@classmethod
+ @since('1.4.0')
def load(cls, sc, path):
+ """
+ Load a model from the given path.
+ """
java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel.load(sc._jsc.sc(), path)
return KMeansModel(_java2py(sc, java_model.clusterCenters()))
class KMeans(object):
+ """
+ .. versionadded:: 0.9.0
+ """
@classmethod
+ @since('0.9.0')
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||",
seed=None, initializationSteps=5, epsilon=1e-4, initialModel=None):
"""Train a k-means clustering model."""
@@ -222,9 +240,12 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
True
>>> labels[3]==labels[4]
True
+
+ .. versionadded:: 1.3.0
"""
@property
+ @since('1.4.0')
def weights(self):
"""
Weights for each Gaussian distribution in the mixture, where weights[i] is
@@ -233,6 +254,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
return array(self.call("weights"))
@property
+ @since('1.4.0')
def gaussians(self):
"""
Array of MultivariateGaussian where gaussians[i] represents
@@ -243,10 +265,12 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
for gaussian in zip(*self.call("gaussians"))]
@property
+ @since('1.4.0')
def k(self):
"""Number of gaussians in mixture."""
return len(self.weights)
+ @since('1.3.0')
def predict(self, x):
"""
Find the cluster to which the points in 'x' has maximum membership
@@ -262,6 +286,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
raise TypeError("x should be represented by an RDD, "
"but got %s." % type(x))
+ @since('1.3.0')
def predictSoft(self, x):
"""
Find the membership of each point in 'x' to all mixture components.
@@ -279,6 +304,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader):
"but got %s." % type(x))
@classmethod
+ @since('1.5.0')
def load(cls, sc, path):
"""Load the GaussianMixtureModel from disk.
@@ -302,8 +328,11 @@ class GaussianMixture(object):
:param maxIterations: Number of iterations. Default to 100
:param seed: Random Seed
:param initialModel: GaussianMixtureModel for initializing learning
+
+ .. versionadded:: 1.3.0
"""
@classmethod
+ @since('1.3.0')
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initialModel=None):
"""Train a Gaussian Mixture clustering model."""
initialModelWeights = None
@@ -358,15 +387,19 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
... rmtree(path)
... except OSError:
... pass
+
+ .. versionadded:: 1.5.0
"""
@property
+ @since('1.5.0')
def k(self):
"""
Returns the number of clusters.
"""
return self.call("k")
+ @since('1.5.0')
def assignments(self):
"""
Returns the cluster assignments of this model.
@@ -375,7 +408,11 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
lambda x: (PowerIterationClustering.Assignment(*x)))
@classmethod
+ @since('1.5.0')
def load(cls, sc, path):
+ """
+ Load a model from the given path.
+ """
model = cls._load_java(sc, path)
wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
return PowerIterationClusteringModel(wrapper)
@@ -390,9 +427,12 @@ class PowerIterationClustering(object):
From the abstract: PIC finds a very low-dimensional embedding of a
dataset using truncated power iteration on a normalized pair-wise
similarity matrix of the data.
+
+ .. versionadded:: 1.5.0
"""
@classmethod
+ @since('1.5.0')
def train(cls, rdd, k, maxIterations=100, initMode="random"):
"""
:param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
@@ -415,6 +455,8 @@ class PowerIterationClustering(object):
class Assignment(namedtuple("Assignment", ["id", "cluster"])):
"""
Represents an (id, cluster) tuple.
+
+ .. versionadded:: 1.5.0
"""
@@ -474,17 +516,21 @@ class StreamingKMeansModel(KMeansModel):
0
>>> stkm.predict([1.5, 1.5])
1
+
+ .. versionadded:: 1.5.0
"""
def __init__(self, clusterCenters, clusterWeights):
super(StreamingKMeansModel, self).__init__(centers=clusterCenters)
self._clusterWeights = list(clusterWeights)
@property
+ @since('1.5.0')
def clusterWeights(self):
"""Return the cluster weights."""
return self._clusterWeights
@ignore_unicode_prefix
+ @since('1.5.0')
def update(self, data, decayFactor, timeUnit):
"""Update the centroids, according to data
@@ -523,6 +569,8 @@ class StreamingKMeans(object):
:param decayFactor: float, forgetfulness of the previous centroids.
:param timeUnit: can be "batches" or "points". If points, then the
decayfactor is raised to the power of no. of new points.
+
+ .. versionadded:: 1.5.0
"""
def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
self._k = k
@@ -533,6 +581,7 @@ class StreamingKMeans(object):
self._timeUnit = timeUnit
self._model = None
+ @since('1.5.0')
def latestModel(self):
"""Return the latest model"""
return self._model
@@ -547,16 +596,19 @@ class StreamingKMeans(object):
"Expected dstream to be of type DStream, "
"got type %s" % type(dstream))
+ @since('1.5.0')
def setK(self, k):
"""Set number of clusters."""
self._k = k
return self
+ @since('1.5.0')
def setDecayFactor(self, decayFactor):
"""Set decay factor."""
self._decayFactor = decayFactor
return self
+ @since('1.5.0')
def setHalfLife(self, halfLife, timeUnit):
"""
Set number of batches after which the centroids of that
@@ -566,6 +618,7 @@ class StreamingKMeans(object):
self._decayFactor = exp(log(0.5) / halfLife)
return self
+ @since('1.5.0')
def setInitialCenters(self, centers, weights):
"""
Set initial centers. Should be set before calling trainOn.
@@ -573,6 +626,7 @@ class StreamingKMeans(object):
self._model = StreamingKMeansModel(centers, weights)
return self
+ @since('1.5.0')
def setRandomCenters(self, dim, weight, seed):
"""
Set the initial centres to be random samples from
@@ -584,6 +638,7 @@ class StreamingKMeans(object):
self._model = StreamingKMeansModel(clusterCenters, clusterWeights)
return self
+ @since('1.5.0')
def trainOn(self, dstream):
"""Train the model on the incoming dstream."""
self._validate(dstream)
@@ -593,6 +648,7 @@ class StreamingKMeans(object):
dstream.foreachRDD(update)
+ @since('1.5.0')
def predictOn(self, dstream):
"""
Make predictions on a dstream.
@@ -601,6 +657,7 @@ class StreamingKMeans(object):
self._validate(dstream)
return dstream.map(lambda x: self._model.predict(x))
+ @since('1.5.0')
def predictOnValues(self, dstream):
"""
Make predictions on a keyed dstream.
@@ -649,16 +706,21 @@ class LDAModel(JavaModelWrapper):
... rmtree(path)
... except OSError:
... pass
+
+ .. versionadded:: 1.5.0
"""
+ @since('1.5.0')
def topicsMatrix(self):
"""Inferred topics, where each topic is represented by a distribution over terms."""
return self.call("topicsMatrix").toArray()
+ @since('1.5.0')
def vocabSize(self):
"""Vocabulary size (number of terms or terms in the vocabulary)"""
return self.call("vocabSize")
+ @since('1.5.0')
def save(self, sc, path):
"""Save the LDAModel on to disk.
@@ -672,6 +734,7 @@ class LDAModel(JavaModelWrapper):
self._java_model.save(sc._jsc.sc(), path)
@classmethod
+ @since('1.5.0')
def load(cls, sc, path):
"""Load the LDAModel from disk.
@@ -688,8 +751,12 @@ class LDAModel(JavaModelWrapper):
class LDA(object):
+ """
+ .. versionadded:: 1.5.0
+ """
@classmethod
+ @since('1.5.0')
def train(cls, rdd, k=10, maxIterations=20, docConcentration=-1.0,
topicConcentration=-1.0, seed=None, checkpointInterval=10, optimizer="em"):
"""Train a LDA model.