From 08c1972a0661d42f300520cc6e5fb31023de093b Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Wed, 15 Feb 2017 16:26:05 -0800 Subject: [SPARK-18080][ML][PYTHON] Python API & Examples for Locality Sensitive Hashing ## What changes were proposed in this pull request? This pull request includes python API and examples for LSH. The API changes was based on yanboliang 's PR #15768 and resolved conflicts and API changes on the Scala API. The examples are consistent with Scala examples of MinHashLSH and BucketedRandomProjectionLSH. ## How was this patch tested? API and examples are tested using spark-submit: `bin/spark-submit examples/src/main/python/ml/min_hash_lsh.py` `bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh.py` User guide changes are generated and manually inspected: `SKIP_API=1 jekyll build` Author: Yun Ni Author: Yanbo Liang Author: Yunni Closes #16715 from Yunni/spark-18080. --- python/pyspark/ml/feature.py | 291 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 291 insertions(+) (limited to 'python/pyspark/ml/feature.py') diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 1ab42919ea..c2eafbefcd 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -28,6 +28,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm from pyspark.ml.common import inherit_doc __all__ = ['Binarizer', + 'BucketedRandomProjectionLSH', 'BucketedRandomProjectionLSHModel', 'Bucketizer', 'ChiSqSelector', 'ChiSqSelectorModel', 'CountVectorizer', 'CountVectorizerModel', @@ -37,6 +38,7 @@ __all__ = ['Binarizer', 'IDF', 'IDFModel', 'IndexToString', 'MaxAbsScaler', 'MaxAbsScalerModel', + 'MinHashLSH', 'MinHashLSHModel', 'MinMaxScaler', 'MinMaxScalerModel', 'NGram', 'Normalizer', @@ -120,6 +122,196 @@ class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, Java return self.getOrDefault(self.threshold) +class LSHParams(Params): + """ + Mixin for Locality Sensitive Hashing (LSH) algorithm parameters. + """ + + numHashTables = Param(Params._dummy(), "numHashTables", "number of hash tables, where " + + "increasing number of hash tables lowers the false negative rate, " + + "and decreasing it improves the running performance.", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(LSHParams, self).__init__() + + def setNumHashTables(self, value): + """ + Sets the value of :py:attr:`numHashTables`. + """ + return self._set(numHashTables=value) + + def getNumHashTables(self): + """ + Gets the value of numHashTables or its default value. + """ + return self.getOrDefault(self.numHashTables) + + +class LSHModel(JavaModel): + """ + Mixin for Locality Sensitive Hashing (LSH) models. + """ + + def approxNearestNeighbors(self, dataset, key, numNearestNeighbors, distCol="distCol"): + """ + Given a large dataset and an item, approximately find at most k items which have the + closest distance to the item. If the :py:attr:`outputCol` is missing, the method will + transform the data; if the :py:attr:`outputCol` exists, it will use that. This allows + caching of the transformed data when necessary. + + .. note:: This method is experimental and will likely change behavior in the next release. + + :param dataset: The dataset to search for nearest neighbors of the key. + :param key: Feature vector representing the item to search for. + :param numNearestNeighbors: The maximum number of nearest neighbors. + :param distCol: Output column for storing the distance between each result row and the key. + Use "distCol" as default value if it's not specified. + :return: A dataset containing at most k items closest to the key. A column "distCol" is + added to show the distance between each row and the key. + """ + return self._call_java("approxNearestNeighbors", dataset, key, numNearestNeighbors, + distCol) + + def approxSimilarityJoin(self, datasetA, datasetB, threshold, distCol="distCol"): + """ + Join two datasets to approximately find all pairs of rows whose distance are smaller than + the threshold. If the :py:attr:`outputCol` is missing, the method will transform the data; + if the :py:attr:`outputCol` exists, it will use that. This allows caching of the + transformed data when necessary. + + :param datasetA: One of the datasets to join. + :param datasetB: Another dataset to join. + :param threshold: The threshold for the distance of row pairs. + :param distCol: Output column for storing the distance between each pair of rows. Use + "distCol" as default value if it's not specified. + :return: A joined dataset containing pairs of rows. The original rows are in columns + "datasetA" and "datasetB", and a column "distCol" is added to show the distance + between each pair. + """ + return self._call_java("approxSimilarityJoin", datasetA, datasetB, threshold, distCol) + + +@inherit_doc +class BucketedRandomProjectionLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed, + JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + LSH class for Euclidean distance metrics. + The input is dense or sparse vectors, each of which represents a point in the Euclidean + distance space. The output will be vectors of configurable dimension. Hash values in the same + dimension are calculated by the same hash function. + + .. seealso:: `Stable Distributions \ + `_ + .. seealso:: `Hashing for Similarity Search: A Survey `_ + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.sql.functions import col + >>> data = [(0, Vectors.dense([-1.0, -1.0 ]),), + ... (1, Vectors.dense([-1.0, 1.0 ]),), + ... (2, Vectors.dense([1.0, -1.0 ]),), + ... (3, Vectors.dense([1.0, 1.0]),)] + >>> df = spark.createDataFrame(data, ["id", "features"]) + >>> brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", + ... seed=12345, bucketLength=1.0) + >>> model = brp.fit(df) + >>> model.transform(df).head() + Row(id=0, features=DenseVector([-1.0, -1.0]), hashes=[DenseVector([-1.0])]) + >>> data2 = [(4, Vectors.dense([2.0, 2.0 ]),), + ... (5, Vectors.dense([2.0, 3.0 ]),), + ... (6, Vectors.dense([3.0, 2.0 ]),), + ... (7, Vectors.dense([3.0, 3.0]),)] + >>> df2 = spark.createDataFrame(data2, ["id", "features"]) + >>> model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect() + [Row(id=4, features=DenseVector([2.0, 2.0]), hashes=[DenseVector([1.0])], distCol=1.0)] + >>> model.approxSimilarityJoin(df, df2, 3.0, distCol="EuclideanDistance").select( + ... col("datasetA.id").alias("idA"), + ... col("datasetB.id").alias("idB"), + ... col("EuclideanDistance")).show() + +---+---+-----------------+ + |idA|idB|EuclideanDistance| + +---+---+-----------------+ + | 3| 6| 2.23606797749979| + +---+---+-----------------+ + ... + >>> brpPath = temp_path + "/brp" + >>> brp.save(brpPath) + >>> brp2 = BucketedRandomProjectionLSH.load(brpPath) + >>> brp2.getBucketLength() == brp.getBucketLength() + True + >>> modelPath = temp_path + "/brp-model" + >>> model.save(modelPath) + >>> model2 = BucketedRandomProjectionLSHModel.load(modelPath) + >>> model.transform(df).head().hashes == model2.transform(df).head().hashes + True + + .. versionadded:: 2.2.0 + """ + + bucketLength = Param(Params._dummy(), "bucketLength", "the length of each hash bucket, " + + "a larger bucket lowers the false negative rate.", + typeConverter=TypeConverters.toFloat) + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, + bucketLength=None): + """ + __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, + bucketLength=None) + """ + super(BucketedRandomProjectionLSH, self).__init__() + self._java_obj = \ + self._new_java_obj("org.apache.spark.ml.feature.BucketedRandomProjectionLSH", self.uid) + self._setDefault(numHashTables=1) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, + bucketLength=None): + """ + setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1, \ + bucketLength=None) + Sets params for this BucketedRandomProjectionLSH. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.2.0") + def setBucketLength(self, value): + """ + Sets the value of :py:attr:`bucketLength`. + """ + return self._set(bucketLength=value) + + @since("2.2.0") + def getBucketLength(self): + """ + Gets the value of bucketLength or its default value. + """ + return self.getOrDefault(self.bucketLength) + + def _create_model(self, java_model): + return BucketedRandomProjectionLSHModel(java_model) + + +class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + Model fitted by :py:class:`BucketedRandomProjectionLSH`, where multiple random vectors are + stored. The vectors are normalized to be unit vectors and each vector is used in a hash + function: :math:`h_i(x) = floor(r_i \cdot x / bucketLength)` where :math:`r_i` is the + i-th random unit vector. The number of buckets will be `(max L2 norm of input vectors) / + bucketLength`. + + .. versionadded:: 2.2.0 + """ + + @inherit_doc class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ @@ -754,6 +946,105 @@ class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable): return self._call_java("maxAbs") +@inherit_doc +class MinHashLSH(JavaEstimator, LSHParams, HasInputCol, HasOutputCol, HasSeed, + JavaMLReadable, JavaMLWritable): + + """ + .. note:: Experimental + + LSH class for Jaccard distance. + The input can be dense or sparse vectors, but it is more efficient if it is sparse. + For example, `Vectors.sparse(10, [(2, 1.0), (3, 1.0), (5, 1.0)])` means there are 10 elements + in the space. This set contains elements 2, 3, and 5. Also, any input vector must have at + least 1 non-zero index, and all non-zero values are treated as binary "1" values. + + .. seealso:: `Wikipedia on MinHash `_ + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.sql.functions import col + >>> data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),), + ... (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),), + ... (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)] + >>> df = spark.createDataFrame(data, ["id", "features"]) + >>> mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345) + >>> model = mh.fit(df) + >>> model.transform(df).head() + Row(id=0, features=SparseVector(6, {0: 1.0, 1: 1.0, 2: 1.0}), hashes=[DenseVector([-1638925... + >>> data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),), + ... (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),), + ... (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)] + >>> df2 = spark.createDataFrame(data2, ["id", "features"]) + >>> key = Vectors.sparse(6, [1, 2], [1.0, 1.0]) + >>> model.approxNearestNeighbors(df2, key, 1).collect() + [Row(id=5, features=SparseVector(6, {1: 1.0, 2: 1.0, 4: 1.0}), hashes=[DenseVector([-163892... + >>> model.approxSimilarityJoin(df, df2, 0.6, distCol="JaccardDistance").select( + ... col("datasetA.id").alias("idA"), + ... col("datasetB.id").alias("idB"), + ... col("JaccardDistance")).show() + +---+---+---------------+ + |idA|idB|JaccardDistance| + +---+---+---------------+ + | 1| 4| 0.5| + | 0| 5| 0.5| + +---+---+---------------+ + ... + >>> mhPath = temp_path + "/mh" + >>> mh.save(mhPath) + >>> mh2 = MinHashLSH.load(mhPath) + >>> mh2.getOutputCol() == mh.getOutputCol() + True + >>> modelPath = temp_path + "/mh-model" + >>> model.save(modelPath) + >>> model2 = MinHashLSHModel.load(modelPath) + >>> model.transform(df).head().hashes == model2.transform(df).head().hashes + True + + .. versionadded:: 2.2.0 + """ + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1): + """ + __init__(self, inputCol=None, outputCol=None, seed=None, numHashTables=1) + """ + super(MinHashLSH, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.MinHashLSH", self.uid) + self._setDefault(numHashTables=1) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1): + """ + setParams(self, inputCol=None, outputCol=None, seed=None, numHashTables=1) + Sets params for this MinHashLSH. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return MinHashLSHModel(java_model) + + +class MinHashLSHModel(LSHModel, JavaMLReadable, JavaMLWritable): + """ + .. note:: Experimental + + Model produced by :py:class:`MinHashLSH`, where where multiple hash functions are stored. Each + hash function is picked from the following family of hash functions, where :math:`a_i` and + :math:`b_i` are randomly chosen integers less than prime: + :math:`h_i(x) = ((x \cdot a_i + b_i) \mod prime)` This hash family is approximately min-wise + independent according to the reference. + + .. seealso:: Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear \ + permutations." Electronic Journal of Combinatorics 7 (2000): R26. + + .. versionadded:: 2.2.0 + """ + + @inherit_doc class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ -- cgit v1.2.3