aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala32
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala27
-rw-r--r--python/pyspark/mllib/clustering.py98
3 files changed, 154 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala
new file mode 100644
index 0000000000..bc6041b221
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PowerIterationClusteringModelWrapper.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.clustering.PowerIterationClusteringModel
+
+/**
+ * A Wrapper of PowerIterationClusteringModel to provide helper method for Python
+ */
+private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel)
+ extends PowerIterationClusteringModel(model.k, model.assignments) {
+
+ def getAssignments: RDD[Array[Any]] = {
+ model.assignments.map(x => Array(x.id, x.cluster))
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index b16903a8d5..a66a404d5c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -407,6 +407,33 @@ private[python] class PythonMLLibAPI extends Serializable {
}
/**
+ * Java stub for Python mllib PowerIterationClustering.run(). This stub returns a
+ * handle to the Java object instead of the content of the Java object. Extra care
+ * needs to be taken in the Python code to ensure it gets freed on exit; see the
+ * Py4J documentation.
+ * @param data an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix.
+ * @param k number of clusters.
+ * @param maxIterations maximum number of iterations of the power iteration loop.
+ * @param initMode the initialization mode. This can be either "random" to use
+ * a random vector as vertex properties, or "degree" to use
+ * normalized sum similarities. Default: random.
+ */
+ def trainPowerIterationClusteringModel(
+ data: JavaRDD[Vector],
+ k: Int,
+ maxIterations: Int,
+ initMode: String): PowerIterationClusteringModel = {
+
+ val pic = new PowerIterationClustering()
+ .setK(k)
+ .setMaxIterations(maxIterations)
+ .setInitializationMode(initMode)
+
+ val model = pic.run(data.rdd.map(v => (v(0).toLong, v(1).toLong, v(2))))
+ new PowerIterationClusteringModelWrapper(model)
+ }
+
+ /**
* Java stub for Python mllib ALS.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
* needs to be taken in the Python code to ensure it gets freed on exit; see
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 8bc0654c76..e3c8a24c4a 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -25,15 +25,18 @@ from math import exp, log
from numpy import array, random, tile
+from collections import namedtuple
+
from pyspark import SparkContext
from pyspark.rdd import RDD, ignore_unicode_prefix
-from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
+from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
-from pyspark.mllib.util import Saveable, Loader, inherit_doc
+from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
from pyspark.streaming import DStream
__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
+ 'PowerIterationClusteringModel', 'PowerIterationClustering',
'StreamingKMeans', 'StreamingKMeansModel']
@@ -272,6 +275,94 @@ class GaussianMixture(object):
return GaussianMixtureModel(weight, mvg_obj)
+class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
+
+ """
+ .. note:: Experimental
+
+ Model produced by [[PowerIterationClustering]].
+
+ >>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0),
+ ... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)]
+ >>> rdd = sc.parallelize(data, 2)
+ >>> model = PowerIterationClustering.train(rdd, 2, 100)
+ >>> model.k
+ 2
+ >>> sorted(model.assignments().collect())
+ [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
+ >>> import os, tempfile
+ >>> path = tempfile.mkdtemp()
+ >>> model.save(sc, path)
+ >>> sameModel = PowerIterationClusteringModel.load(sc, path)
+ >>> sameModel.k
+ 2
+ >>> sorted(sameModel.assignments().collect())
+ [Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
+ >>> from shutil import rmtree
+ >>> try:
+ ... rmtree(path)
+ ... except OSError:
+ ... pass
+ """
+
+ @property
+ def k(self):
+ """
+ Returns the number of clusters.
+ """
+ return self.call("k")
+
+ def assignments(self):
+ """
+ Returns the cluster assignments of this model.
+ """
+ return self.call("getAssignments").map(
+ lambda x: (PowerIterationClustering.Assignment(*x)))
+
+ @classmethod
+ def load(cls, sc, path):
+ model = cls._load_java(sc, path)
+ wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
+ return PowerIterationClusteringModel(wrapper)
+
+
+class PowerIterationClustering(object):
+ """
+ .. note:: Experimental
+
+ Power Iteration Clustering (PIC), a scalable graph clustering algorithm
+ developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]].
+ From the abstract: PIC finds a very low-dimensional embedding of a
+ dataset using truncated power iteration on a normalized pair-wise
+ similarity matrix of the data.
+ """
+
+ @classmethod
+ def train(cls, rdd, k, maxIterations=100, initMode="random"):
+ """
+ :param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
+ affinity matrix, which is the matrix A in the PIC paper.
+ The similarity s,,ij,, must be nonnegative.
+ This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
+ For any (i, j) with nonzero similarity, there should be
+ either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
+ Tuples with i = j are ignored, because we assume
+ s,,ij,, = 0.0.
+ :param k: Number of clusters.
+ :param maxIterations: Maximum number of iterations of the
+ PIC algorithm.
+ :param initMode: Initialization mode.
+ """
+ model = callMLlibFunc("trainPowerIterationClusteringModel",
+ rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
+ return PowerIterationClusteringModel(model)
+
+ class Assignment(namedtuple("Assignment", ["id", "cluster"])):
+ """
+ Represents an (id, cluster) tuple.
+ """
+
+
class StreamingKMeansModel(KMeansModel):
"""
.. note:: Experimental
@@ -466,7 +557,8 @@ class StreamingKMeans(object):
def _test():
import doctest
- globs = globals().copy()
+ import pyspark.mllib.clustering
+ globs = pyspark.mllib.clustering.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()