From 9309ddfc3b9cca3780555fb3ac52d96343cb9545 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 21 Nov 2014 15:02:31 -0800 Subject: [SPARK-4531] [MLlib] cache serialized java object The Pyrolite is pretty slow (comparing to the adhoc serializer in 1.1), it cause much performance regression in 1.2, because we cache the serialized Python object in JVM, deserialize them into Java object in each step. This PR change to cache the deserialized JavaRDD instead of PythonRDD to avoid the deserialization of Pyrolite. It should have similar memory usage as before, but much faster. Author: Davies Liu Closes #3397 from davies/cache and squashes the following commits: 7f6e6ce [Davies Liu] Update -> Updater 4b52edd [Davies Liu] using named argument 63b984e [Davies Liu] fix 7da0332 [Davies Liu] add unpersist() dff33e1 [Davies Liu] address comments c2bdfc2 [Davies Liu] refactor d572f00 [Davies Liu] Merge branch 'master' into cache f1063e1 [Davies Liu] cache serialized java object (cherry picked from commit ce95bd8e130b2c7688b94be40683bdd90d86012d) Signed-off-by: Xiangrui Meng --- python/pyspark/mllib/clustering.py | 8 +++----- python/pyspark/mllib/common.py | 4 +--- python/pyspark/mllib/recommendation.py | 4 ++-- python/pyspark/mllib/regression.py | 5 ++--- 4 files changed, 8 insertions(+), 13 deletions(-) (limited to 'python/pyspark') diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index fe4c4cc509..e2492eef5b 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -16,7 +16,7 @@ # from pyspark import SparkContext -from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _to_java_object_rdd +from pyspark.mllib.common import callMLlibFunc, callJavaFunc from pyspark.mllib.linalg import SparseVector, _convert_to_vector __all__ = ['KMeansModel', 'KMeans'] @@ -80,10 +80,8 @@ class KMeans(object): @classmethod def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" - # cache serialized data to avoid objects over head in JVM - jcached = _to_java_object_rdd(rdd.map(_convert_to_vector), cache=True) - model = callMLlibFunc("trainKMeansModel", jcached, k, maxIterations, runs, - initializationMode) + model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations, + runs, initializationMode) centers = callJavaFunc(rdd.context, model.clusterCenters) return KMeansModel([c.toArray() for c in centers]) diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index c6149fe391..33c49e2399 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -54,15 +54,13 @@ _picklable_classes = [ # this will call the MLlib version of pythonToJava() -def _to_java_object_rdd(rdd, cache=False): +def _to_java_object_rdd(rdd): """ Return an JavaRDD of Object by unpickling It will convert each Python object into Java object by Pyrolite, whenever the RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) - if cache: - rdd.cache() return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 2bcbf2aaf8..97ec74eda0 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -19,7 +19,7 @@ from collections import namedtuple from pyspark import SparkContext from pyspark.rdd import RDD -from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, _to_java_object_rdd +from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating'] @@ -110,7 +110,7 @@ class ALS(object): ratings = ratings.map(lambda x: Rating(*x)) else: raise ValueError("rating should be RDD of Rating or tuple/list") - return _to_java_object_rdd(ratings, True) + return ratings @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False, diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index f4f5e615fa..210060140f 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -18,7 +18,7 @@ import numpy as np from numpy import array -from pyspark.mllib.common import callMLlibFunc, _to_java_object_rdd +from pyspark.mllib.common import callMLlibFunc from pyspark.mllib.linalg import SparseVector, _convert_to_vector __all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel', @@ -129,8 +129,7 @@ def _regression_train_wrapper(train_func, modelClass, data, initial_weights): if not isinstance(first, LabeledPoint): raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first) initial_weights = initial_weights or [0.0] * len(data.first().features) - weights, intercept = train_func(_to_java_object_rdd(data, cache=True), - _convert_to_vector(initial_weights)) + weights, intercept = train_func(data, _convert_to_vector(initial_weights)) return modelClass(weights, intercept) -- cgit v1.2.3