aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-11-10 12:58:28 -0800
committerJosh Rosen <joshrosen@apache.org>2013-11-10 16:46:00 -0800
commitffa5bedf46fbc89ad5c5658f3b423dfff49b70f0 (patch)
tree972ab8bb7b02ee9903a524c28f24c9399c30d4fd /python/pyspark/rdd.py
parentcbb7f04aef2220ece93dea9f3fa98b5db5f270d6 (diff)
downloadspark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.tar.gz
spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.tar.bz2
spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.zip
Send PySpark commands as bytes insetad of strings.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6691c30519..062f44f81e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -27,9 +27,8 @@ from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
-from pyspark import cloudpickle
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
- BatchedSerializer, pack_long
+ BatchedSerializer, CloudPickleSerializer, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -970,8 +969,8 @@ class PipelinedRDD(RDD):
serializer = NoOpSerializer()
else:
serializer = self.ctx.serializer
- cmds = [self.func, self._prev_jrdd_deserializer, serializer]
- pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds)
+ command = (self.func, self._prev_jrdd_deserializer, serializer)
+ pickled_command = CloudPickleSerializer()._dumps(command)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
@@ -982,8 +981,9 @@ class PipelinedRDD(RDD):
includes = ListConverter().convert(self.ctx._python_includes,
self.ctx._gateway._gateway_client)
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
- pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec,
- broadcast_vars, self.ctx._javaAccumulator, class_manifest)
+ bytearray(pickled_command), env, includes, self.preservesPartitioning,
+ self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
+ class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val