aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/recommendation.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/recommendation.py')
-rw-r--r--python/pyspark/mllib/recommendation.py69
1 files changed, 53 insertions, 16 deletions
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 2df23394da..59c1c5ff0c 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -16,17 +16,25 @@
#
from pyspark import SparkContext
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _serialize_tuple, RatingDeserializer
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
__all__ = ['MatrixFactorizationModel', 'ALS']
+class Rating(object):
+ def __init__(self, user, product, rating):
+ self.user = int(user)
+ self.product = int(product)
+ self.rating = float(rating)
+
+ def __reduce__(self):
+ return Rating, (self.user, self.product, self.rating)
+
+ def __repr__(self):
+ return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+
+
class MatrixFactorizationModel(object):
"""A matrix factorisation model trained by regularized alternating
@@ -39,7 +47,9 @@ class MatrixFactorizationModel(object):
>>> model = ALS.trainImplicit(ratings, 1)
>>> model.predict(2,2) is not None
True
+
>>> testset = sc.parallelize([(1, 2), (1, 1)])
+ >>> model = ALS.train(ratings, 1)
>>> model.predictAll(testset).count() == 2
True
"""
@@ -54,34 +64,61 @@ class MatrixFactorizationModel(object):
def predict(self, user, product):
return self._java_model.predict(user, product)
- def predictAll(self, usersProducts):
- usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
- return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
- self._context, RatingDeserializer())
+ def predictAll(self, user_product):
+ assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
+ first = user_product.first()
+ if isinstance(first, list):
+ user_product = user_product.map(tuple)
+ first = tuple(first)
+ assert type(first) is tuple and len(first) == 2, \
+ "user_product should be RDD of (user, product)"
+ if any(isinstance(x, str) for x in first):
+ user_product = user_product.map(lambda (u, p): (int(x), int(p)))
+ first = tuple(map(int, first))
+ assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
+ sc = self._context
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ jresult = self._java_model.predict(tuplerdd).toJavaRDD()
+ return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ AutoBatchedSerializer(PickleSerializer()))
class ALS(object):
@classmethod
+ def _prepare(cls, ratings):
+ assert isinstance(ratings, RDD), "ratings should be RDD"
+ first = ratings.first()
+ if not isinstance(first, Rating):
+ if isinstance(first, (tuple, list)):
+ ratings = ratings.map(lambda x: Rating(*x))
+ else:
+ raise ValueError("rating should be RDD of Rating or tuple/list")
+ # serialize them by AutoBatchedSerializer before cache to reduce the
+ # objects overhead in JVM
+ cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
+ return cached._to_java_object_rdd()
+
+ @classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
sc = ratings.context
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
- mod = sc._jvm.PythonMLLibAPI().trainALSModel(
- ratingBytes._jrdd, rank, iterations, lambda_, blocks)
+ jrating = cls._prepare(ratings)
+ mod = sc._jvm.PythonMLLibAPI().trainALSModel(jrating, rank, iterations, lambda_, blocks)
return MatrixFactorizationModel(sc, mod)
@classmethod
def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
sc = ratings.context
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ jrating = cls._prepare(ratings)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(
- ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)
+ jrating, rank, iterations, lambda_, blocks, alpha)
return MatrixFactorizationModel(sc, mod)
def _test():
import doctest
- globs = globals().copy()
+ import pyspark.mllib.recommendation
+ globs = pyspark.mllib.recommendation.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()