aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-06-28 22:38:04 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-28 22:38:04 -0700
commitdfde31da5ce30e0d44cad4fb6618b44d5353d946 (patch)
treeae8506a9c2c79756df8479d89b1cc43f8eb933fe /python
parent25f574eb9a3cb9b93b7d9194a8ec16e00ce2c036 (diff)
downloadspark-dfde31da5ce30e0d44cad4fb6618b44d5353d946.tar.gz
spark-dfde31da5ce30e0d44cad4fb6618b44d5353d946.tar.bz2
spark-dfde31da5ce30e0d44cad4fb6618b44d5353d946.zip
[SPARK-5962] [MLLIB] Python support for Power Iteration Clustering
Python support for Power Iteration Clustering https://issues.apache.org/jira/browse/SPARK-5962 Author: Yanbo Liang <ybliang8@gmail.com> Closes #6992 from yanboliang/pyspark-pic and squashes the following commits: 6b03d82 [Yanbo Liang] address comments 4be4423 [Yanbo Liang] Python support for Power Iteration Clustering
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/clustering.py98
1 files changed, 95 insertions, 3 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 8bc0654c76..e3c8a24c4a 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -25,15 +25,18 @@ from math import exp, log
from numpy import array, random, tile
+from collections import namedtuple
+
from pyspark import SparkContext
from pyspark.rdd import RDD, ignore_unicode_prefix
-from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
+from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
-from pyspark.mllib.util import Saveable, Loader, inherit_doc
+from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
from pyspark.streaming import DStream
__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
+ 'PowerIterationClusteringModel', 'PowerIterationClustering',
'StreamingKMeans', 'StreamingKMeansModel']
@@ -272,6 +275,94 @@ class GaussianMixture(object):
return GaussianMixtureModel(weight, mvg_obj)
+class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
+
+ """
+ .. note:: Experimental
+
+ Model produced by [[PowerIterationClustering]].
+
+ >>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0),
+ ... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)]
+ >>> rdd = sc.parallelize(data, 2)
+ >>> model = PowerIterationClustering.train(rdd, 2, 100)
+ >>> model.k
+ 2
+ >>> sorted(model.assignments().collect())
+ [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
+ >>> import os, tempfile
+ >>> path = tempfile.mkdtemp()
+ >>> model.save(sc, path)
+ >>> sameModel = PowerIterationClusteringModel.load(sc, path)
+ >>> sameModel.k
+ 2
+ >>> sorted(sameModel.assignments().collect())
+ [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
+ >>> from shutil import rmtree
+ >>> try:
+ ... rmtree(path)
+ ... except OSError:
+ ... pass
+ """
+
+ @property
+ def k(self):
+ """
+ Returns the number of clusters.
+ """
+ return self.call("k")
+
+ def assignments(self):
+ """
+ Returns the cluster assignments of this model.
+ """
+ return self.call("getAssignments").map(
+ lambda x: (PowerIterationClustering.Assignment(*x)))
+
+ @classmethod
+ def load(cls, sc, path):
+ model = cls._load_java(sc, path)
+ wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
+ return PowerIterationClusteringModel(wrapper)
+
+
+class PowerIterationClustering(object):
+ """
+ .. note:: Experimental
+
+ Power Iteration Clustering (PIC), a scalable graph clustering algorithm
+ developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]].
+ From the abstract: PIC finds a very low-dimensional embedding of a
+ dataset using truncated power iteration on a normalized pair-wise
+ similarity matrix of the data.
+ """
+
+ @classmethod
+ def train(cls, rdd, k, maxIterations=100, initMode="random"):
+ """
+ :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
+ affinity matrix, which is the matrix A in the PIC paper.
+ The similarity s,,ij,, must be nonnegative.
+ This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
+ For any (i, j) with nonzero similarity, there should be
+ either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
+ Tuples with i = j are ignored, because we assume
+ s,,ij,, = 0.0.
+ :param k: Number of clusters.
+ :param maxIterations: Maximum number of iterations of the
+ PIC algorithm.
+ :param initMode: Initialization mode.
+ """
+ model = callMLlibFunc("trainPowerIterationClusteringModel",
+ rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
+ return PowerIterationClusteringModel(model)
+
+ class Assignment(namedtuple("Assignment", ["id", "cluster"])):
+ """
+ Represents an (id, cluster) tuple.
+ """
+
+
class StreamingKMeansModel(KMeansModel):
"""
.. note:: Experimental
@@ -466,7 +557,8 @@ class StreamingKMeans(object):
def _test():
import doctest
- globs = globals().copy()
+ import pyspark.mllib.clustering
+ globs = pyspark.mllib.clustering.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()