From dfde31da5ce30e0d44cad4fb6618b44d5353d946 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 28 Jun 2015 22:38:04 -0700 Subject: [SPARK-5962] [MLLIB] Python support for Power Iteration Clustering Python support for Power Iteration Clustering https://issues.apache.org/jira/browse/SPARK-5962 Author: Yanbo Liang Closes #6992 from yanboliang/pyspark-pic and squashes the following commits: 6b03d82 [Yanbo Liang] address comments 4be4423 [Yanbo Liang] Python support for Power Iteration Clustering --- python/pyspark/mllib/clustering.py | 98 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8bc0654c76..e3c8a24c4a 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -25,15 +25,18 @@ from math import exp, log from numpy import array, random, tile +from collections import namedtuple + from pyspark import SparkContext from pyspark.rdd import RDD, ignore_unicode_prefix -from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py +from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector from pyspark.mllib.stat.distribution import MultivariateGaussian -from pyspark.mllib.util import Saveable, Loader, inherit_doc +from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable from pyspark.streaming import DStream __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture', + 'PowerIterationClusteringModel', 'PowerIterationClustering', 'StreamingKMeans', 'StreamingKMeansModel'] @@ -272,6 +275,94 @@ class GaussianMixture(object): return GaussianMixtureModel(weight, mvg_obj) +class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): + + """ + .. note:: Experimental + + Model produced by [[PowerIterationClustering]]. + + >>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0), + ... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)] + >>> rdd = sc.parallelize(data, 2) + >>> model = PowerIterationClustering.train(rdd, 2, 100) + >>> model.k + 2 + >>> sorted(model.assignments().collect()) + [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ... + >>> import os, tempfile + >>> path = tempfile.mkdtemp() + >>> model.save(sc, path) + >>> sameModel = PowerIterationClusteringModel.load(sc, path) + >>> sameModel.k + 2 + >>> sorted(sameModel.assignments().collect()) + [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ... + >>> from shutil import rmtree + >>> try: + ... rmtree(path) + ... except OSError: + ... pass + """ + + @property + def k(self): + """ + Returns the number of clusters. + """ + return self.call("k") + + def assignments(self): + """ + Returns the cluster assignments of this model. + """ + return self.call("getAssignments").map( + lambda x: (PowerIterationClustering.Assignment(*x))) + + @classmethod + def load(cls, sc, path): + model = cls._load_java(sc, path) + wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model) + return PowerIterationClusteringModel(wrapper) + + +class PowerIterationClustering(object): + """ + .. note:: Experimental + + Power Iteration Clustering (PIC), a scalable graph clustering algorithm + developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. + From the abstract: PIC finds a very low-dimensional embedding of a + dataset using truncated power iteration on a normalized pair-wise + similarity matrix of the data. + """ + + @classmethod + def train(cls, rdd, k, maxIterations=100, initMode="random"): + """ + :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the + affinity matrix, which is the matrix A in the PIC paper. + The similarity s,,ij,, must be nonnegative. + This is a symmetric matrix and hence s,,ij,, = s,,ji,,. + For any (i, j) with nonzero similarity, there should be + either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. + Tuples with i = j are ignored, because we assume + s,,ij,, = 0.0. + :param k: Number of clusters. + :param maxIterations: Maximum number of iterations of the + PIC algorithm. + :param initMode: Initialization mode. + """ + model = callMLlibFunc("trainPowerIterationClusteringModel", + rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode) + return PowerIterationClusteringModel(model) + + class Assignment(namedtuple("Assignment", ["id", "cluster"])): + """ + Represents an (id, cluster) tuple. + """ + + class StreamingKMeansModel(KMeansModel): """ .. note:: Experimental @@ -466,7 +557,8 @@ class StreamingKMeans(object): def _test(): import doctest - globs = globals().copy() + import pyspark.mllib.clustering + globs = pyspark.mllib.clustering.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() -- cgit v1.2.3