diff options
author | Davies Liu <davies@databricks.com> | 2014-11-21 15:02:31 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-11-21 15:02:31 -0800 |
commit | ce95bd8e130b2c7688b94be40683bdd90d86012d (patch) | |
tree | 396d4e26517f3fc6e84a904ca6466ffb5da2f222 /python/pyspark/mllib/clustering.py | |
parent | a81918c5a66fc6040f9796fc1a9d4e0bfb8d0cbe (diff) | |
download | spark-ce95bd8e130b2c7688b94be40683bdd90d86012d.tar.gz spark-ce95bd8e130b2c7688b94be40683bdd90d86012d.tar.bz2 spark-ce95bd8e130b2c7688b94be40683bdd90d86012d.zip |
[SPARK-4531] [MLlib] cache serialized java object
The Pyrolite is pretty slow (comparing to the adhoc serializer in 1.1), it cause much performance regression in 1.2, because we cache the serialized Python object in JVM, deserialize them into Java object in each step.
This PR change to cache the deserialized JavaRDD instead of PythonRDD to avoid the deserialization of Pyrolite. It should have similar memory usage as before, but much faster.
Author: Davies Liu <davies@databricks.com>
Closes #3397 from davies/cache and squashes the following commits:
7f6e6ce [Davies Liu] Update -> Updater
4b52edd [Davies Liu] using named argument
63b984e [Davies Liu] fix
7da0332 [Davies Liu] add unpersist()
dff33e1 [Davies Liu] address comments
c2bdfc2 [Davies Liu] refactor
d572f00 [Davies Liu] Merge branch 'master' into cache
f1063e1 [Davies Liu] cache serialized java object
Diffstat (limited to 'python/pyspark/mllib/clustering.py')
-rw-r--r-- | python/pyspark/mllib/clustering.py | 8 |
1 files changed, 3 insertions, 5 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index fe4c4cc509..e2492eef5b 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -16,7 +16,7 @@ # from pyspark import SparkContext -from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _to_java_object_rdd +from pyspark.mllib.common import callMLlibFunc, callJavaFunc from pyspark.mllib.linalg import SparseVector, _convert_to_vector __all__ = ['KMeansModel', 'KMeans'] @@ -80,10 +80,8 @@ class KMeans(object): @classmethod def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" - # cache serialized data to avoid objects over head in JVM - jcached = _to_java_object_rdd(rdd.map(_convert_to_vector), cache=True) - model = callMLlibFunc("trainKMeansModel", jcached, k, maxIterations, runs, - initializationMode) + model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations, + runs, initializationMode) centers = callJavaFunc(rdd.context, model.clusterCenters) return KMeansModel([c.toArray() for c in centers]) |