aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/recommendation.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/recommendation.py')
-rw-r--r--python/pyspark/mllib/recommendation.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 59c1c5ff0c..17f96b8700 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -18,6 +18,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MatrixFactorizationModel', 'ALS']
@@ -77,9 +78,9 @@ class MatrixFactorizationModel(object):
first = tuple(map(int, first))
assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
sc = self._context
- tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
jresult = self._java_model.predict(tuplerdd).toJavaRDD()
- return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
AutoBatchedSerializer(PickleSerializer()))
@@ -97,7 +98,7 @@ class ALS(object):
# serialize them by AutoBatchedSerializer before cache to reduce the
# objects overhead in JVM
cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
- return cached._to_java_object_rdd()
+ return _to_java_object_rdd(cached)
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):