aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/mllib/clustering.py8
-rw-r--r--python/pyspark/mllib/common.py4
-rw-r--r--python/pyspark/mllib/recommendation.py4
-rw-r--r--python/pyspark/mllib/regression.py5
4 files changed, 8 insertions, 13 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index fe4c4cc509..e2492eef5b 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -16,7 +16,7 @@
#
from pyspark import SparkContext
-from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _to_java_object_rdd
+from pyspark.mllib.common import callMLlibFunc, callJavaFunc
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
__all__ = ['KMeansModel', 'KMeans']
@@ -80,10 +80,8 @@ class KMeans(object):
@classmethod
def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"):
"""Train a k-means clustering model."""
- # cache serialized data to avoid objects over head in JVM
- jcached = _to_java_object_rdd(rdd.map(_convert_to_vector), cache=True)
- model = callMLlibFunc("trainKMeansModel", jcached, k, maxIterations, runs,
- initializationMode)
+ model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
+ runs, initializationMode)
centers = callJavaFunc(rdd.context, model.clusterCenters)
return KMeansModel([c.toArray() for c in centers])
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index c6149fe391..33c49e2399 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -54,15 +54,13 @@ _picklable_classes = [
# this will call the MLlib version of pythonToJava()
-def _to_java_object_rdd(rdd, cache=False):
+def _to_java_object_rdd(rdd):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
- if cache:
- rdd.cache()
return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 2bcbf2aaf8..97ec74eda0 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -19,7 +19,7 @@ from collections import namedtuple
from pyspark import SparkContext
from pyspark.rdd import RDD
-from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, _to_java_object_rdd
+from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc
__all__ = ['MatrixFactorizationModel', 'ALS', 'Rating']
@@ -110,7 +110,7 @@ class ALS(object):
ratings = ratings.map(lambda x: Rating(*x))
else:
raise ValueError("rating should be RDD of Rating or tuple/list")
- return _to_java_object_rdd(ratings, True)
+ return ratings
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False,
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index f4f5e615fa..210060140f 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -18,7 +18,7 @@
import numpy as np
from numpy import array
-from pyspark.mllib.common import callMLlibFunc, _to_java_object_rdd
+from pyspark.mllib.common import callMLlibFunc
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
@@ -129,8 +129,7 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
if not isinstance(first, LabeledPoint):
raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first)
initial_weights = initial_weights or [0.0] * len(data.first().features)
- weights, intercept = train_func(_to_java_object_rdd(data, cache=True),
- _convert_to_vector(initial_weights))
+ weights, intercept = train_func(data, _convert_to_vector(initial_weights))
return modelClass(weights, intercept)