diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6691c30519..062f44f81e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -27,9 +27,8 @@ from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread -from pyspark import cloudpickle from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, pack_long + BatchedSerializer, CloudPickleSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -970,8 +969,8 @@ class PipelinedRDD(RDD): serializer = NoOpSerializer() else: serializer = self.ctx.serializer - cmds = [self.func, self._prev_jrdd_deserializer, serializer] - pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds) + command = (self.func, self._prev_jrdd_deserializer, serializer) + pickled_command = CloudPickleSerializer()._dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], self.ctx._gateway._gateway_client) @@ -982,8 +981,9 @@ class PipelinedRDD(RDD): includes = ListConverter().convert(self.ctx._python_includes, self.ctx._gateway._gateway_client) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), - pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec, - broadcast_vars, self.ctx._javaAccumulator, class_manifest) + bytearray(pickled_command), env, includes, self.preservesPartitioning, + self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, + class_manifest) self._jrdd_val = python_rdd.asJavaRDD() return self._jrdd_val |