diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/context.py | 4 | ||||
-rw-r--r-- | python/pyspark/serializers.py | 6 | ||||
-rw-r--r-- | python/pyspark/worker.py | 8 |
3 files changed, 9 insertions, 9 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f955aad7a4..f318b5d9a7 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -27,7 +27,7 @@ from pyspark.broadcast import Broadcast from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway -from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD @@ -234,7 +234,7 @@ class SparkContext(object): """ minSplits = minSplits or min(self.defaultParallelism, 2) return RDD(self._jsc.textFile(name, minSplits), self, - MUTF8Deserializer()) + UTF8Deserializer()) def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 2a500ab919..8c6ad79059 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -261,13 +261,13 @@ class MarshalSerializer(FramedSerializer): loads = marshal.loads -class MUTF8Deserializer(Serializer): +class UTF8Deserializer(Serializer): """ - Deserializes streams written by Java's DataOutputStream.writeUTF(). + Deserializes streams written by getBytes. """ def loads(self, stream): - length = struct.unpack('>H', stream.read(2))[0] + length = read_int(stream) return stream.read(length).decode('utf8') def load_stream(self, stream): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d77981f61f..4be4063dcf 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -30,11 +30,11 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.cloudpickle import CloudPickler from pyspark.files import SparkFiles from pyspark.serializers import write_with_length, write_int, read_long, \ - write_long, read_int, SpecialLengths, MUTF8Deserializer, PickleSerializer + write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer pickleSer = PickleSerializer() -mutf8_deserializer = MUTF8Deserializer() +utf8_deserializer = UTF8Deserializer() def report_times(outfile, boot, init, finish): @@ -51,7 +51,7 @@ def main(infile, outfile): return # fetch name of workdir - spark_files_dir = mutf8_deserializer.loads(infile) + spark_files_dir = utf8_deserializer.loads(infile) SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True @@ -66,7 +66,7 @@ def main(infile, outfile): sys.path.append(spark_files_dir) # *.py files that were added will be copied here num_python_includes = read_int(infile) for _ in range(num_python_includes): - filename = mutf8_deserializer.loads(infile) + filename = utf8_deserializer.loads(infile) sys.path.append(os.path.join(spark_files_dir, filename)) command = pickleSer._read_with_length(infile) |