aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-01-28 19:50:26 -0800
committerJosh Rosen <joshrosen@apache.org>2014-01-28 20:20:08 -0800
commit1381fc72f7a34f690a98ab72cec8ffb61e0e564d (patch)
tree8ae129c4b291b4b5589a77b919f508c4535fbf2c /python
parent84670f2715392859624df290c1b52eb4ed4a9cb1 (diff)
downloadspark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.tar.gz
spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.tar.bz2
spark-1381fc72f7a34f690a98ab72cec8ffb61e0e564d.zip
Switch from MUTF8 to UTF8 in PySpark serializers.
This fixes SPARK-1043, a bug introduced in 0.9.0 where PySpark couldn't serialize strings > 64kB. This fix was written by @tyro89 and @bouk in #512. This commit squashes and rebases their pull request in order to fix some merge conflicts.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py4
-rw-r--r--python/pyspark/serializers.py6
-rw-r--r--python/pyspark/worker.py8
3 files changed, 9 insertions, 9 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index f955aad7a4..f318b5d9a7 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -27,7 +27,7 @@ from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
-from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer
+from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
@@ -234,7 +234,7 @@ class SparkContext(object):
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
return RDD(self._jsc.textFile(name, minSplits), self,
- MUTF8Deserializer())
+ UTF8Deserializer())
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 2a500ab919..8c6ad79059 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -261,13 +261,13 @@ class MarshalSerializer(FramedSerializer):
loads = marshal.loads
-class MUTF8Deserializer(Serializer):
+class UTF8Deserializer(Serializer):
"""
- Deserializes streams written by Java's DataOutputStream.writeUTF().
+ Deserializes streams written by getBytes.
"""
def loads(self, stream):
- length = struct.unpack('>H', stream.read(2))[0]
+ length = read_int(stream)
return stream.read(length).decode('utf8')
def load_stream(self, stream):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index d77981f61f..4be4063dcf 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -30,11 +30,11 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, write_int, read_long, \
- write_long, read_int, SpecialLengths, MUTF8Deserializer, PickleSerializer
+ write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer
pickleSer = PickleSerializer()
-mutf8_deserializer = MUTF8Deserializer()
+utf8_deserializer = UTF8Deserializer()
def report_times(outfile, boot, init, finish):
@@ -51,7 +51,7 @@ def main(infile, outfile):
return
# fetch name of workdir
- spark_files_dir = mutf8_deserializer.loads(infile)
+ spark_files_dir = utf8_deserializer.loads(infile)
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
@@ -66,7 +66,7 @@ def main(infile, outfile):
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
- filename = mutf8_deserializer.loads(infile)
+ filename = utf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))
command = pickleSer._read_with_length(infile)