diff options
Diffstat (limited to 'python/pyspark/mllib/recommendation.py')
-rw-r--r-- | python/pyspark/mllib/recommendation.py | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 59c1c5ff0c..17f96b8700 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -18,6 +18,7 @@ from pyspark import SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.rdd import RDD +from pyspark.mllib.linalg import _to_java_object_rdd __all__ = ['MatrixFactorizationModel', 'ALS'] @@ -77,9 +78,9 @@ class MatrixFactorizationModel(object): first = tuple(map(int, first)) assert all(type(x) is int for x in first), "user and product in user_product shoul be int" sc = self._context - tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd()) + tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd()) jresult = self._java_model.predict(tuplerdd).toJavaRDD() - return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc, + return RDD(sc._jvm.SerDe.javaToPython(jresult), sc, AutoBatchedSerializer(PickleSerializer())) @@ -97,7 +98,7 @@ class ALS(object): # serialize them by AutoBatchedSerializer before cache to reduce the # objects overhead in JVM cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache() - return cached._to_java_object_rdd() + return _to_java_object_rdd(cached) @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): |