aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-10-16 14:56:50 -0700
committerXiangrui Meng <meng@databricks.com>2014-10-16 14:56:50 -0700
commit091d32c52e9d73da95896016c1d920e89858abfa (patch)
tree904edd29e64b57fa1ab72d3ca37ed2996aa9d1e4 /python
parent4c589cac4496c6a4bb8485a340bd0641dca13847 (diff)
downloadspark-091d32c52e9d73da95896016c1d920e89858abfa.tar.gz
spark-091d32c52e9d73da95896016c1d920e89858abfa.tar.bz2
spark-091d32c52e9d73da95896016c1d920e89858abfa.zip
[SPARK-3971] [MLLib] [PySpark] hotfix: Customized pickler should work in cluster mode
Customized pickler should be registered before unpickling, but in executor, there is no way to register the picklers before run the tasks. So, we need to register the picklers in the tasks itself, duplicate the javaToPython() and pythonToJava() in MLlib, call SerDe.initialize() before pickling or unpickling. Author: Davies Liu <davies.liu@gmail.com> Closes #2830 from davies/fix_pickle and squashes the following commits: 0c85fb9 [Davies Liu] revert the privacy change 6b94e15 [Davies Liu] use JavaConverters instead of JavaConversions 0f02050 [Davies Liu] hotfix: Customized pickler does not work in cluster
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py2
-rw-r--r--python/pyspark/mllib/classification.py4
-rw-r--r--python/pyspark/mllib/clustering.py4
-rw-r--r--python/pyspark/mllib/feature.py5
-rw-r--r--python/pyspark/mllib/linalg.py13
-rw-r--r--python/pyspark/mllib/random.py2
-rw-r--r--python/pyspark/mllib/recommendation.py7
-rw-r--r--python/pyspark/mllib/regression.py4
-rw-r--r--python/pyspark/mllib/stat.py7
-rw-r--r--python/pyspark/mllib/tree.py8
-rw-r--r--python/pyspark/mllib/util.py6
11 files changed, 37 insertions, 25 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 89d2e2e5b4..8d27ccb95f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -215,8 +215,6 @@ class SparkContext(object):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
- SparkContext._jvm.SerDeUtil.initialize()
- SparkContext._jvm.SerDe.initialize()
if instance:
if (SparkContext._active_spark_context and
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index cd43982191..e295c9d095 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,7 +21,7 @@ import numpy
from numpy import array
from pyspark import SparkContext, PickleSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
@@ -244,7 +244,7 @@ class NaiveBayes(object):
:param lambda_: The smoothing parameter
"""
sc = data.context
- jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+ jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(_to_java_object_rdd(data), lambda_)
labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 12c5602271..5ee7997104 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -17,7 +17,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
__all__ = ['KMeansModel', 'KMeans']
@@ -85,7 +85,7 @@ class KMeans(object):
# cache serialized data to avoid objects over head in JVM
cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
- cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+ _to_java_object_rdd(cached), k, maxIterations, runs, initializationMode)
bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
centers = ser.loads(str(bytes))
return KMeansModel([c.toArray() for c in centers])
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index f4cbf31b94..b5a3f22c69 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -19,8 +19,7 @@
Python package for feature in MLlib.
"""
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
__all__ = ['Word2Vec', 'Word2VecModel']
@@ -176,7 +175,7 @@ class Word2Vec(object):
seed = self.seed
model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
- data._to_java_object_rdd(), vectorSize,
+ _to_java_object_rdd(data), vectorSize,
learningRate, numPartitions, numIterations, seed)
return Word2VecModel(sc, model)
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 24c5480b2f..773d8d3938 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -29,6 +29,8 @@ import copy_reg
import numpy as np
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
+
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
@@ -50,6 +52,17 @@ except:
_have_scipy = False
+# this will call the MLlib version of pythonToJava()
+def _to_java_object_rdd(rdd):
+ """ Return an JavaRDD of Object by unpickling
+
+ It will convert each Python object into Java object by Pyrolite, whenever the
+ RDD is serialized in batch or not.
+ """
+ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
+ return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
+
+
def _convert_to_vector(l):
if isinstance(l, Vector):
return l
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index a787e4dea2..73baba4ace 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -32,7 +32,7 @@ def serialize(f):
@wraps(f)
def func(sc, *a, **kw):
jrdd = f(sc, *a, **kw)
- return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc,
BatchedSerializer(PickleSerializer(), 1024))
return func
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 59c1c5ff0c..17f96b8700 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -18,6 +18,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MatrixFactorizationModel', 'ALS']
@@ -77,9 +78,9 @@ class MatrixFactorizationModel(object):
first = tuple(map(int, first))
assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
sc = self._context
- tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
jresult = self._java_model.predict(tuplerdd).toJavaRDD()
- return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
AutoBatchedSerializer(PickleSerializer()))
@@ -97,7 +98,7 @@ class ALS(object):
# serialize them by AutoBatchedSerializer before cache to reduce the
# objects overhead in JVM
cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
- return cached._to_java_object_rdd()
+ return _to_java_object_rdd(cached)
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 12b322aaae..93e17faf5c 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -19,8 +19,8 @@ import numpy as np
from numpy import array
from pyspark import SparkContext
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -131,7 +131,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights)
# use AutoBatchedSerializer before cache to reduce the memory
# overhead in JVM
cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
- ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+ ans = train_func(_to_java_object_rdd(cached), initial_bytes)
assert len(ans) == 2, "JVM call result had unexpected length"
weights = ser.loads(str(ans[0]))
return modelClass(weights, ans[1])
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index b9de0909a6..a6019dadf7 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
from functools import wraps
from pyspark import PickleSerializer
+from pyspark.mllib.linalg import _to_java_object_rdd
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
@@ -106,7 +107,7 @@ class Statistics(object):
array([ 2., 0., 0., -2.])
"""
sc = rdd.ctx
- jrdd = rdd._to_java_object_rdd()
+ jrdd = _to_java_object_rdd(rdd)
cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)
@@ -162,14 +163,14 @@ class Statistics(object):
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
- jx = x._to_java_object_rdd()
+ jx = _to_java_object_rdd(x)
if not y:
resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
bytes = sc._jvm.SerDe.dumps(resultMat)
ser = PickleSerializer()
return ser.loads(str(bytes)).toArray()
else:
- jy = y._to_java_object_rdd()
+ jy = _to_java_object_rdd(y)
return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5d7abfb96b..0938eebd3a 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -19,7 +19,7 @@ from py4j.java_collections import MapConverter
from pyspark import SparkContext, RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
-from pyspark.mllib.linalg import Vector, _convert_to_vector
+from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint
__all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -61,8 +61,8 @@ class DecisionTreeModel(object):
return self._sc.parallelize([])
if not isinstance(first[0], Vector):
x = x.map(_convert_to_vector)
- jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
- jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+ jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD()
+ jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred)
return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
else:
@@ -104,7 +104,7 @@ class DecisionTree(object):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
sc = data.context
- jrdd = data._to_java_object_rdd()
+ jrdd = _to_java_object_rdd(data)
cfiMap = MapConverter().convert(categoricalFeaturesInfo,
sc._gateway._gateway_client)
model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1357fd4fbc..84b39a4861 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,7 @@ import numpy as np
import warnings
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
@@ -174,8 +174,8 @@ class MLUtils(object):
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
- jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
- return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
+ jpyrdd = sc._jvm.SerDe.javaToPython(jrdd)
+ return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer()))
def _test():