aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-05-08 17:24:32 -0700
committerXiangrui Meng <meng@databricks.com>2015-05-08 17:24:32 -0700
commit84bf931f36edf1f319c9116f7f326959a6118991 (patch)
treeed7930e9cc5fe6855026689f5d0e4ebdfebb4832 /python/pyspark/ml
parent54e6fa0563ffa8788ec2fd1b8740445ef3c2ce5a (diff)
downloadspark-84bf931f36edf1f319c9116f7f326959a6118991.tar.gz
spark-84bf931f36edf1f319c9116f7f326959a6118991.tar.bz2
spark-84bf931f36edf1f319c9116f7f326959a6118991.zip
[SPARK-7488] [ML] Feature Parity in PySpark for ml.recommendation
Adds Python Api for `ALS` under `ml.recommendation` in PySpark. Also adds seed as a settable parameter in the Scala Implementation of ALS. Author: Burak Yavuz <brkyvz@gmail.com> Closes #6015 from brkyvz/ml-rec and squashes the following commits: be6e931 [Burak Yavuz] addressed comments eaed879 [Burak Yavuz] readd numFeatures 0bd66b1 [Burak Yavuz] fixed seed 7f6d964 [Burak Yavuz] merged master 52e2bda [Burak Yavuz] added ALS
Diffstat (limited to 'python/pyspark/ml')
-rw-r--r--python/pyspark/ml/param/_shared_params_code_gen.py2
-rw-r--r--python/pyspark/ml/param/shared.py29
-rw-r--r--python/pyspark/ml/recommendation.py279
3 files changed, 310 insertions, 0 deletions
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py
index ee901f2584..ed3171b697 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -97,6 +97,8 @@ if __name__ == "__main__":
("inputCol", "input column name", None),
("inputCols", "input column names", None),
("outputCol", "output column name", None),
+ ("numFeatures", "number of features", None),
+ ("checkpointInterval", "checkpoint interval (>= 1)", None),
("seed", "random seed", None),
("tol", "the convergence tolerance for iterative algorithms", None),
("stepSize", "Step size to be used for each iteration of optimization.", None)]
diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py
index 5e7529c1dc..d0bcadee22 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -310,6 +310,35 @@ class HasNumFeatures(Params):
return self.getOrDefault(self.numFeatures)
+class HasCheckpointInterval(Params):
+ """
+ Mixin for param checkpointInterval: checkpoint interval (>= 1).
+ """
+
+ # a placeholder to make it appear in the generated doc
+ checkpointInterval = Param(Params._dummy(), "checkpointInterval", "checkpoint interval (>= 1)")
+
+ def __init__(self):
+ super(HasCheckpointInterval, self).__init__()
+ #: param for checkpoint interval (>= 1)
+ self.checkpointInterval = Param(self, "checkpointInterval", "checkpoint interval (>= 1)")
+ if None is not None:
+ self._setDefault(checkpointInterval=None)
+
+ def setCheckpointInterval(self, value):
+ """
+ Sets the value of :py:attr:`checkpointInterval`.
+ """
+ self.paramMap[self.checkpointInterval] = value
+ return self
+
+ def getCheckpointInterval(self):
+ """
+ Gets the value of checkpointInterval or its default value.
+ """
+ return self.getOrDefault(self.checkpointInterval)
+
+
class HasSeed(Params):
"""
Mixin for param seed: random seed.
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
new file mode 100644
index 0000000000..4846b907e8
--- /dev/null
+++ b/python/pyspark/ml/recommendation.py
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.ml.util import keyword_only
+from pyspark.ml.wrapper import JavaEstimator, JavaModel
+from pyspark.ml.param.shared import *
+from pyspark.mllib.common import inherit_doc
+
+
+__all__ = ['ALS', 'ALSModel']
+
+
+@inherit_doc
+class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed):
+ """
+ Alternating Least Squares (ALS) matrix factorization.
+
+ ALS attempts to estimate the ratings matrix `R` as the product of
+ two lower-rank matrices, `X` and `Y`, i.e. `X * Yt = R`. Typically
+ these approximations are called 'factor' matrices. The general
+ approach is iterative. During each iteration, one of the factor
+ matrices is held constant, while the other is solved for using least
+ squares. The newly-solved factor matrix is then held constant while
+ solving for the other factor matrix.
+
+ This is a blocked implementation of the ALS factorization algorithm
+ that groups the two sets of factors (referred to as "users" and
+ "products") into blocks and reduces communication by only sending
+ one copy of each user vector to each product block on each
+ iteration, and only for the product blocks that need that user's
+ feature vector. This is achieved by pre-computing some information
+ about the ratings matrix to determine the "out-links" of each user
+ (which blocks of products it will contribute to) and "in-link"
+ information for each product (which of the feature vectors it
+ receives from each user block it will depend on). This allows us to
+ send only an array of feature vectors between each user block and
+ product block, and have the product block find the users' ratings
+ and update the products based on these messages.
+
+ For implicit preference data, the algorithm used is based on
+ "Collaborative Filtering for Implicit Feedback Datasets", available
+ at `http://dx.doi.org/10.1109/ICDM.2008.22`, adapted for the blocked
+ approach used here.
+
+ Essentially instead of finding the low-rank approximations to the
+ rating matrix `R`, this finds the approximations for a preference
+ matrix `P` where the elements of `P` are 1 if r > 0 and 0 if r <= 0.
+ The ratings then act as 'confidence' values related to strength of
+ indicated user preferences rather than explicit ratings given to
+ items.
+
+ >>> als = ALS(rank=10, maxIter=5)
+ >>> model = als.fit(df)
+ >>> test = sqlContext.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"])
+ >>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
+ >>> predictions[0]
+ Row(user=0, item=2, prediction=0.39...)
+ >>> predictions[1]
+ Row(user=1, item=0, prediction=3.19...)
+ >>> predictions[2]
+ Row(user=2, item=0, prediction=-1.15...)
+ """
+ _java_class = "org.apache.spark.ml.recommendation.ALS"
+ # a placeholder to make it appear in the generated doc
+ rank = Param(Params._dummy(), "rank", "rank of the factorization")
+ numUserBlocks = Param(Params._dummy(), "numUserBlocks", "number of user blocks")
+ numItemBlocks = Param(Params._dummy(), "numItemBlocks", "number of item blocks")
+ implicitPrefs = Param(Params._dummy(), "implicitPrefs", "whether to use implicit preference")
+ alpha = Param(Params._dummy(), "alpha", "alpha for implicit preference")
+ userCol = Param(Params._dummy(), "userCol", "column name for user ids")
+ itemCol = Param(Params._dummy(), "itemCol", "column name for item ids")
+ ratingCol = Param(Params._dummy(), "ratingCol", "column name for ratings")
+ nonnegative = Param(Params._dummy(), "nonnegative",
+ "whether to use nonnegative constraint for least squares")
+
+ @keyword_only
+ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+ ratingCol="rating", nonnegative=False, checkpointInterval=10):
+ """
+ __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+ implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0,
+ ratingCol="rating", nonnegative=false, checkpointInterval=10)
+ """
+ super(ALS, self).__init__()
+ self.rank = Param(self, "rank", "rank of the factorization")
+ self.numUserBlocks = Param(self, "numUserBlocks", "number of user blocks")
+ self.numItemBlocks = Param(self, "numItemBlocks", "number of item blocks")
+ self.implicitPrefs = Param(self, "implicitPrefs", "whether to use implicit preference")
+ self.alpha = Param(self, "alpha", "alpha for implicit preference")
+ self.userCol = Param(self, "userCol", "column name for user ids")
+ self.itemCol = Param(self, "itemCol", "column name for item ids")
+ self.ratingCol = Param(self, "ratingCol", "column name for ratings")
+ self.nonnegative = Param(self, "nonnegative",
+ "whether to use nonnegative constraint for least squares")
+ self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+ ratingCol="rating", nonnegative=False, checkpointInterval=10)
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+ ratingCol="rating", nonnegative=False, checkpointInterval=10):
+ """
+ setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
+ implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
+ ratingCol="rating", nonnegative=False, checkpointInterval=10)
+ Sets params for ALS.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ def _create_model(self, java_model):
+ return ALSModel(java_model)
+
+ def setRank(self, value):
+ """
+ Sets the value of :py:attr:`rank`.
+ """
+ self.paramMap[self.rank] = value
+ return self
+
+ def getRank(self):
+ """
+ Gets the value of rank or its default value.
+ """
+ return self.getOrDefault(self.rank)
+
+ def setNumUserBlocks(self, value):
+ """
+ Sets the value of :py:attr:`numUserBlocks`.
+ """
+ self.paramMap[self.numUserBlocks] = value
+ return self
+
+ def getNumUserBlocks(self):
+ """
+ Gets the value of numUserBlocks or its default value.
+ """
+ return self.getOrDefault(self.numUserBlocks)
+
+ def setNumItemBlocks(self, value):
+ """
+ Sets the value of :py:attr:`numItemBlocks`.
+ """
+ self.paramMap[self.numItemBlocks] = value
+ return self
+
+ def getNumItemBlocks(self):
+ """
+ Gets the value of numItemBlocks or its default value.
+ """
+ return self.getOrDefault(self.numItemBlocks)
+
+ def setNumBlocks(self, value):
+ """
+ Sets both :py:attr:`numUserBlocks` and :py:attr:`numItemBlocks` to the specific value.
+ """
+ self.paramMap[self.numUserBlocks] = value
+ self.paramMap[self.numItemBlocks] = value
+
+ def setImplicitPrefs(self, value):
+ """
+ Sets the value of :py:attr:`implicitPrefs`.
+ """
+ self.paramMap[self.implicitPrefs] = value
+ return self
+
+ def getImplicitPrefs(self):
+ """
+ Gets the value of implicitPrefs or its default value.
+ """
+ return self.getOrDefault(self.implicitPrefs)
+
+ def setAlpha(self, value):
+ """
+ Sets the value of :py:attr:`alpha`.
+ """
+ self.paramMap[self.alpha] = value
+ return self
+
+ def getAlpha(self):
+ """
+ Gets the value of alpha or its default value.
+ """
+ return self.getOrDefault(self.alpha)
+
+ def setUserCol(self, value):
+ """
+ Sets the value of :py:attr:`userCol`.
+ """
+ self.paramMap[self.userCol] = value
+ return self
+
+ def getUserCol(self):
+ """
+ Gets the value of userCol or its default value.
+ """
+ return self.getOrDefault(self.userCol)
+
+ def setItemCol(self, value):
+ """
+ Sets the value of :py:attr:`itemCol`.
+ """
+ self.paramMap[self.itemCol] = value
+ return self
+
+ def getItemCol(self):
+ """
+ Gets the value of itemCol or its default value.
+ """
+ return self.getOrDefault(self.itemCol)
+
+ def setRatingCol(self, value):
+ """
+ Sets the value of :py:attr:`ratingCol`.
+ """
+ self.paramMap[self.ratingCol] = value
+ return self
+
+ def getRatingCol(self):
+ """
+ Gets the value of ratingCol or its default value.
+ """
+ return self.getOrDefault(self.ratingCol)
+
+ def setNonnegative(self, value):
+ """
+ Sets the value of :py:attr:`nonnegative`.
+ """
+ self.paramMap[self.nonnegative] = value
+ return self
+
+ def getNonnegative(self):
+ """
+ Gets the value of nonnegative or its default value.
+ """
+ return self.getOrDefault(self.nonnegative)
+
+
+class ALSModel(JavaModel):
+ """
+ Model fitted by ALS.
+ """
+
+
+if __name__ == "__main__":
+ import doctest
+ from pyspark.context import SparkContext
+ from pyspark.sql import SQLContext
+ globs = globals().copy()
+ # The small batch size here ensures that we see multiple batches,
+ # even in these small test examples:
+ sc = SparkContext("local[2]", "ml.recommendation tests")
+ sqlContext = SQLContext(sc)
+ globs['sc'] = sc
+ globs['sqlContext'] = sqlContext
+ globs['df'] = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0),
+ (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"])
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ sc.stop()
+ if failure_count:
+ exit(-1)