aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml/feature.py
diff options
context:
space:
mode:
authorYun Ni <yunn@uber.com>2017-02-15 16:26:05 -0800
committerYanbo Liang <ybliang8@gmail.com>2017-02-15 16:26:05 -0800
commit08c1972a0661d42f300520cc6e5fb31023de093b (patch)
tree8b392b4520df66ca32834c11fc376009be70e8b8 /python/pyspark/ml/feature.py
parent21b4ba2d6f21a9759af879471715c123073bd67a (diff)
downloadspark-08c1972a0661d42f300520cc6e5fb31023de093b.tar.gz
spark-08c1972a0661d42f300520cc6e5fb31023de093b.tar.bz2
spark-08c1972a0661d42f300520cc6e5fb31023de093b.zip
[SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing
## What changes were proposed in this pull request? This pull request includes python API and examples for LSH. The API changes was based on yanboliang 's PR #15768 and resolved conflicts and API changes on the Scala API. The examples are consistent with Scala examples of MinHashLSH and BucketedRandomProjectionLSH. ## How was this patch tested? API and examples are tested using spark-submit: `bin/spark-submit examples/src/main/python/ml/min_hash_lsh.py` `bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh.py` User guide changes are generated and manually inspected: `SKIP_API=1 jekyll build` Author: Yun Ni <yunn@uber.com> Author: Yanbo Liang <ybliang8@gmail.com> Author: Yunni <Euler57721@gmail.com> Closes #16715 from Yunni/spark-18080.
Diffstat (limited to 'python/pyspark/ml/feature.py')
-rwxr-xr-xpython/pyspark/ml/feature.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 1ab42919ea..c2eafbefcd 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -28,6 +28,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm
from pyspark.ml.common import inherit_doc
__all__ = ['Binarizer',
+ 'BucketedRandomProjectionLSH', 'BucketedRandomProjectionLSHModel',
'Bucketizer',
'ChiSqSelector', 'ChiSqSelectorModel',
'CountVectorizer', 'CountVectorizerModel',
@@ -37,6 +38,7 @@ __all__ = ['Binarizer',
'IDF', 'IDFModel',
'IndexToString',
'MaxAbsScaler', 'MaxAbsScalerModel',
+ 'MinHashLSH', 'MinHashLSHModel',
'MinMaxScaler', 'MinMaxScalerModel',
'NGram',
'Normalizer',
@@ -120,6 +122,196 @@ class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Java
return self.getOrDefault(self.threshold)
+class LSHParams(Params):
+ """
+ Mixin for Locality Sensitive Hashing (LSH) algorithm parameters.
+ """
+
+ numHashTables = Param(Params._dummy(), "numHashTables", "number of hash tables, where " +
+ "increasing number of hash tables lowers the false negative rate, " +
+ "and decreasing it improves the running performance.",
+ typeConverter=TypeConverters.toInt)
+
+ def __init__(self):
+ super(LSHParams, self).__init__()
+
+ def setNumHashTables(self, value):
+ """
+ Sets the value of :py:attr:`numHashTables`.
+ """
+ return self._set(numHashTables=value)
+
+ def getNumHashTables(self):
+ """
+ Gets the value of numHashTables or its default value.
+ """
+ return self.getOrDefault(self.numHashTables)
+
+
+class LSHModel(JavaModel):
+ """
+ Mixin for Locality Sensitive Hashing (LSH) models.
+ """
+
+ def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, distCol="distCol"):
+ """
+ Given a large dataset and an item, approximately find at most k items which have the
+ closest distance to the item. If the :py:attr:`outputCol` is missing, the method will
+ transform the data; if the :py:attr:`outputCol` exists, it will use that. This allows
+ caching of the transformed data when necessary.
+
+ .. note:: This method is experimental and will likely change behavior in the next release.
+
+ :param dataset: The dataset to search for nearest neighbors of the key.
+ :param key: Feature vector representing the item to search for.
+ :param numNearestNeighbors: The maximum number of nearest neighbors.
+ :param distCol: Output column for storing the distance between each result row and the key.
+ Use "distCol" as default value if it's not specified.
+ :return: A dataset containing at most k items closest to the key. A column "distCol" is
+ added to show the distance between each row and the key.
+ """
+ return self._call_java("approxNearestNeighbors", dataset, key, numNearestNeighbors,
+ distCol)
+
+ def approxSimilarityJoin(self, datasetA, datasetB, threshold, distCol="distCol"):
+ """
+ Join two datasets to approximately find all pairs of rows whose distance are smaller than
+ the threshold. If the :py:attr:`outputCol` is missing, the method will transform the data;
+ if the :py:attr:`outputCol` exists, it will use that. This allows caching of the
+ transformed data when necessary.
+
+ :param datasetA: One of the datasets to join.
+ :param datasetB: Another dataset to join.
+ :param threshold: The threshold for the distance of row pairs.
+ :param distCol: Output column for storing the distance between each pair of rows. Use
+ "distCol" as default value if it's not specified.
+ :return: A joined dataset containing pairs of rows. The original rows are in columns
+ "datasetA" and "datasetB", and a column "distCol" is added to show the distance
+ between each pair.
+ """
+ return self._call_java("approxSimilarityJoin", datasetA, datasetB, threshold, distCol)
+
+
+@inherit_doc
+class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed,
+ JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ LSH class for Euclidean distance metrics.
+ The input is dense or sparse vectors, each of which represents a point in the Euclidean
+ distance space. The output will be vectors of configurable dimension. Hash values in the same
+ dimension are calculated by the same hash function.
+
+ .. seealso:: `Stable Distributions \
+ <https://en.wikipedia.org/wiki/Locality-sensitive_hashing#Stable_distributions>`_
+ .. seealso:: `Hashing for Similarity Search: A Survey <https://arxiv.org/abs/1408.2927>`_
+
+ >>> from pyspark.ml.linalg import Vectors
+ >>> from pyspark.sql.functions import col
+ >>> data = [(0, Vectors.dense([-1.0, -1.0 ]),),
+ ... (1, Vectors.dense([-1.0, 1.0 ]),),
+ ... (2, Vectors.dense([1.0, -1.0 ]),),
+ ... (3, Vectors.dense([1.0, 1.0]),)]
+ >>> df = spark.createDataFrame(data, ["id", "features"])
+ >>> brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",
+ ... seed=12345, bucketLength=1.0)
+ >>> model = brp.fit(df)
+ >>> model.transform(df).head()
+ Row(id=0, features=DenseVector([-1.0, -1.0]), hashes=[DenseVector([-1.0])])
+ >>> data2 = [(4, Vectors.dense([2.0, 2.0 ]),),
+ ... (5, Vectors.dense([2.0, 3.0 ]),),
+ ... (6, Vectors.dense([3.0, 2.0 ]),),
+ ... (7, Vectors.dense([3.0, 3.0]),)]
+ >>> df2 = spark.createDataFrame(data2, ["id", "features"])
+ >>> model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect()
+ [Row(id=4, features=DenseVector([2.0, 2.0]), hashes=[DenseVector([1.0])], distCol=1.0)]
+ >>> model.approxSimilarityJoin(df, df2, 3.0, distCol="EuclideanDistance").select(
+ ... col("datasetA.id").alias("idA"),
+ ... col("datasetB.id").alias("idB"),
+ ... col("EuclideanDistance")).show()
+ +---+---+-----------------+
+ |idA|idB|EuclideanDistance|
+ +---+---+-----------------+
+ | 3| 6| 2.23606797749979|
+ +---+---+-----------------+
+ ...
+ >>> brpPath = temp_path + "/brp"
+ >>> brp.save(brpPath)
+ >>> brp2 = BucketedRandomProjectionLSH.load(brpPath)
+ >>> brp2.getBucketLength() == brp.getBucketLength()
+ True
+ >>> modelPath = temp_path + "/brp-model"
+ >>> model.save(modelPath)
+ >>> model2 = BucketedRandomProjectionLSHModel.load(modelPath)
+ >>> model.transform(df).head().hashes == model2.transform(df).head().hashes
+ True
+
+ .. versionadded:: 2.2.0
+ """
+
+ bucketLength = Param(Params._dummy(), "bucketLength", "the length of each hash bucket, " +
+ "a larger bucket lowers the false negative rate.",
+ typeConverter=TypeConverters.toFloat)
+
+ @keyword_only
+ def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+ bucketLength=None):
+ """
+ __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+ bucketLength=None)
+ """
+ super(BucketedRandomProjectionLSH, self).__init__()
+ self._java_obj = \
+ self._new_java_obj("org.apache.spark.ml.feature.BucketedRandomProjectionLSH", self.uid)
+ self._setDefault(numHashTables=1)
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.2.0")
+ def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1,
+ bucketLength=None):
+ """
+ setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, \
+ bucketLength=None)
+ Sets params for this BucketedRandomProjectionLSH.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.2.0")
+ def setBucketLength(self, value):
+ """
+ Sets the value of :py:attr:`bucketLength`.
+ """
+ return self._set(bucketLength=value)
+
+ @since("2.2.0")
+ def getBucketLength(self):
+ """
+ Gets the value of bucketLength or its default value.
+ """
+ return self.getOrDefault(self.bucketLength)
+
+ def _create_model(self, java_model):
+ return BucketedRandomProjectionLSHModel(java_model)
+
+
+class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Model fitted by :py:class:`BucketedRandomProjectionLSH`, where multiple random vectors are
+ stored. The vectors are normalized to be unit vectors and each vector is used in a hash
+ function: :math:`h_i(x) = floor(r_i \cdot x / bucketLength)` where :math:`r_i` is the
+ i-th random unit vector. The number of buckets will be `(max L2 norm of input vectors) /
+ bucketLength`.
+
+ .. versionadded:: 2.2.0
+ """
+
+
@inherit_doc
class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
"""
@@ -755,6 +947,105 @@ class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):
@inherit_doc
+class MinHashLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed,
+ JavaMLReadable, JavaMLWritable):
+
+ """
+ .. note:: Experimental
+
+ LSH class for Jaccard distance.
+ The input can be dense or sparse vectors, but it is more efficient if it is sparse.
+ For example, `Vectors.sparse(10, [(2, 1.0), (3, 1.0), (5, 1.0)])` means there are 10 elements
+ in the space. This set contains elements 2, 3, and 5. Also, any input vector must have at
+ least 1 non-zero index, and all non-zero values are treated as binary "1" values.
+
+ .. seealso:: `Wikipedia on MinHash <https://en.wikipedia.org/wiki/MinHash>`_
+
+ >>> from pyspark.ml.linalg import Vectors
+ >>> from pyspark.sql.functions import col
+ >>> data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
+ ... (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
+ ... (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
+ >>> df = spark.createDataFrame(data, ["id", "features"])
+ >>> mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
+ >>> model = mh.fit(df)
+ >>> model.transform(df).head()
+ Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0}), hashes=[DenseVector([-1638925...
+ >>> data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
+ ... (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
+ ... (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
+ >>> df2 = spark.createDataFrame(data2, ["id", "features"])
+ >>> key = Vectors.sparse(6, [1, 2], [1.0, 1.0])
+ >>> model.approxNearestNeighbors(df2, key, 1).collect()
+ [Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}), hashes=[DenseVector([-163892...
+ >>> model.approxSimilarityJoin(df, df2, 0.6, distCol="JaccardDistance").select(
+ ... col("datasetA.id").alias("idA"),
+ ... col("datasetB.id").alias("idB"),
+ ... col("JaccardDistance")).show()
+ +---+---+---------------+
+ |idA|idB|JaccardDistance|
+ +---+---+---------------+
+ | 1| 4| 0.5|
+ | 0| 5| 0.5|
+ +---+---+---------------+
+ ...
+ >>> mhPath = temp_path + "/mh"
+ >>> mh.save(mhPath)
+ >>> mh2 = MinHashLSH.load(mhPath)
+ >>> mh2.getOutputCol() == mh.getOutputCol()
+ True
+ >>> modelPath = temp_path + "/mh-model"
+ >>> model.save(modelPath)
+ >>> model2 = MinHashLSHModel.load(modelPath)
+ >>> model.transform(df).head().hashes == model2.transform(df).head().hashes
+ True
+
+ .. versionadded:: 2.2.0
+ """
+
+ @keyword_only
+ def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1):
+ """
+ __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1)
+ """
+ super(MinHashLSH, self).__init__()
+ self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.MinHashLSH", self.uid)
+ self._setDefault(numHashTables=1)
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.2.0")
+ def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1):
+ """
+ setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1)
+ Sets params for this MinHashLSH.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ def _create_model(self, java_model):
+ return MinHashLSHModel(java_model)
+
+
+class MinHashLSHModel(LSHModel, JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Model produced by :py:class:`MinHashLSH`, where where multiple hash functions are stored. Each
+ hash function is picked from the following family of hash functions, where :math:`a_i` and
+ :math:`b_i` are randomly chosen integers less than prime:
+ :math:`h_i(x) = ((x \cdot a_i + b_i) \mod prime)` This hash family is approximately min-wise
+ independent according to the reference.
+
+ .. seealso:: Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear \
+ permutations." Electronic Journal of Combinatorics 7 (2000): R26.
+
+ .. versionadded:: 2.2.0
+ """
+
+
+@inherit_doc
class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
"""
Rescale each feature individually to a common range [min, max] linearly using column summary