aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-03 23:56:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-03 23:56:37 -0800
commit786e75b33f0bc1445bfc289fe4b62407cb79026e (patch)
tree60f8733cb3577a52fcca2995a510b926ceefcc29 /python/pyspark
parent8395e8fbdf23bef286ec68a4bbadcc448b504c2c (diff)
downloadspark-786e75b33f0bc1445bfc289fe4b62407cb79026e.tar.gz
spark-786e75b33f0bc1445bfc289fe4b62407cb79026e.tar.bz2
spark-786e75b33f0bc1445bfc289fe4b62407cb79026e.zip
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu <davies@databricks.com> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default. (cherry picked from commit e4f42631a68b473ce706429915f3f08042af2119) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/context.py58
-rw-r--r--python/pyspark/mllib/common.py2
-rw-r--r--python/pyspark/mllib/recommendation.py2
-rw-r--r--python/pyspark/rdd.py91
-rw-r--r--python/pyspark/serializers.py36
-rw-r--r--python/pyspark/shuffle.py7
-rw-r--r--python/pyspark/sql.py18
-rw-r--r--python/pyspark/tests.py66
8 files changed, 94 insertions, 186 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5f8dcedb1e..a0e4821728 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -63,7 +63,6 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- _default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
@@ -115,9 +114,7 @@ class SparkContext(object):
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
- if batchSize == 1:
- self.serializer = self._unbatched_serializer
- elif batchSize == 0:
+ if batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
@@ -305,12 +302,8 @@ class SparkContext(object):
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
- batchSize = min(len(c) // numSlices, self._batchSize)
- if batchSize > 1:
- serializer = BatchedSerializer(self._unbatched_serializer,
- batchSize)
- else:
- serializer = self._unbatched_serializer
+ batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+ serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
serializer.dump_stream(c, tempFile)
tempFile.close()
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
@@ -328,8 +321,7 @@ class SparkContext(object):
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
minPartitions = minPartitions or self.defaultMinPartitions
- return RDD(self._jsc.objectFile(name, minPartitions), self,
- BatchedSerializer(PickleSerializer()))
+ return RDD(self._jsc.objectFile(name, minPartitions), self)
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
@@ -405,7 +397,7 @@ class SparkContext(object):
return jm
def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
- valueConverter=None, minSplits=None, batchSize=None):
+ valueConverter=None, minSplits=None, batchSize=0):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -427,17 +419,15 @@ class SparkContext(object):
:param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -458,18 +448,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -487,18 +475,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -519,18 +505,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -548,15 +532,13 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
@@ -836,7 +818,7 @@ def _test():
import doctest
import tempfile
globs = globals().copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index 76864d8163..dbe5f698b7 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -96,7 +96,7 @@ def _java2py(sc, r):
if clsName == 'JavaRDD':
jrdd = sc._jvm.SerDe.javaToPython(r)
- return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
+ return RDD(jrdd, sc)
elif isinstance(r, (JavaArray, JavaList)) or clsName in _picklable_classes:
r = sc._jvm.SerDe.dumps(r)
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 6b32af07c9..e8b998414d 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -117,7 +117,7 @@ def _test():
import doctest
import pyspark.mllib.recommendation
globs = pyspark.mllib.recommendation.__dict__.copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4f025b9f11..879655dc53 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -120,7 +120,7 @@ class RDD(object):
operated on in parallel.
"""
- def __init__(self, jrdd, ctx, jrdd_deserializer):
+ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
self._jrdd = jrdd
self.is_cached = False
self.is_checkpointed = False
@@ -129,12 +129,8 @@ class RDD(object):
self._id = jrdd.id()
self._partitionFunc = None
- def _toPickleSerialization(self):
- if (self._jrdd_deserializer == PickleSerializer() or
- self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
- return self
- else:
- return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+ def _pickled(self):
+ return self._reserialize(AutoBatchedSerializer(PickleSerializer()))
def id(self):
"""
@@ -446,12 +442,11 @@ class RDD(object):
def _reserialize(self, serializer=None):
serializer = serializer or self.ctx.serializer
- if self._jrdd_deserializer == serializer:
- return self
- else:
- converted = self.map(lambda x: x, preservesPartitioning=True)
- converted._jrdd_deserializer = serializer
- return converted
+ if self._jrdd_deserializer != serializer:
+ if not isinstance(self, PipelinedRDD):
+ self = self.map(lambda x: x, preservesPartitioning=True)
+ self._jrdd_deserializer = serializer
+ return self
def __add__(self, other):
"""
@@ -1120,9 +1115,8 @@ class RDD(object):
:param valueConverter: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
keyConverter, valueConverter, True)
def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1147,9 +1141,8 @@ class RDD(object):
:param conf: Hadoop job configuration, passed in as a dict (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path,
outputFormatClass,
keyClass, valueClass,
keyConverter, valueConverter, jconf)
@@ -1166,9 +1159,8 @@ class RDD(object):
:param valueConverter: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
keyConverter, valueConverter, False)
def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
@@ -1195,9 +1187,8 @@ class RDD(object):
:param compressionCodecClass: (None by default)
"""
jconf = self.ctx._dictToJavaMap(conf)
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path,
outputFormatClass,
keyClass, valueClass,
keyConverter, valueConverter,
@@ -1215,9 +1206,8 @@ class RDD(object):
:param path: path to sequence file
:param compressionCodecClass: (None by default)
"""
- pickledRDD = self._toPickleSerialization()
- batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
- self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+ pickledRDD = self._pickled()
+ self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True,
path, compressionCodecClass)
def saveAsPickleFile(self, path, batchSize=10):
@@ -1232,8 +1222,11 @@ class RDD(object):
>>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
[1, 2, 'rdd', 'spark']
"""
- self._reserialize(BatchedSerializer(PickleSerializer(),
- batchSize))._jrdd.saveAsObjectFile(path)
+ if batchSize == 0:
+ ser = AutoBatchedSerializer(PickleSerializer())
+ else:
+ ser = BatchedSerializer(PickleSerializer(), batchSize)
+ self._reserialize(ser)._jrdd.saveAsObjectFile(path)
def saveAsTextFile(self, path):
"""
@@ -1774,13 +1767,10 @@ class RDD(object):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
- if self.getNumPartitions() != other.getNumPartitions():
- raise ValueError("Can only zip with RDD which has the same number of partitions")
-
def get_batch_size(ser):
if isinstance(ser, BatchedSerializer):
return ser.batchSize
- return 0
+ return 1
def batch_as(rdd, batchSize):
ser = rdd._jrdd_deserializer
@@ -1790,12 +1780,16 @@ class RDD(object):
my_batch = get_batch_size(self._jrdd_deserializer)
other_batch = get_batch_size(other._jrdd_deserializer)
- if my_batch != other_batch:
- # use the greatest batchSize to batch the other one.
- if my_batch > other_batch:
- other = batch_as(other, my_batch)
- else:
- self = batch_as(self, other_batch)
+ # use the smallest batchSize for both of them
+ batchSize = min(my_batch, other_batch)
+ if batchSize <= 0:
+ # auto batched or unlimited
+ batchSize = 100
+ other = batch_as(other, batchSize)
+ self = batch_as(self, batchSize)
+
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("Can only zip with RDD which has the same number of partitions")
# There will be an Exception in JVM if there are different number
# of items in each partitions.
@@ -1934,25 +1928,14 @@ class RDD(object):
return values.collect()
- def _is_pickled(self):
- """ Return this RDD is serialized by Pickle or not. """
- der = self._jrdd_deserializer
- if isinstance(der, PickleSerializer):
- return True
- if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
- return True
- return False
-
def _to_java_object_rdd(self):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
- rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
- if not self._is_pickled() else self
- is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
+ rdd = self._pickled()
+ return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
def countApprox(self, timeout, confidence=0.95):
"""
@@ -2132,7 +2115,7 @@ def _test():
globs = globals().copy()
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 904bd9f265..d597cbf94e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -33,9 +33,8 @@ The serializer is chosen when creating L{SparkContext}:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.stop()
-By default, PySpark serialize objects in batches; the batch size can be
-controlled through SparkContext's C{batchSize} parameter
-(the default size is 1024 objects):
+PySpark serialize objects in batches; By default, the batch size is chosen based
+on the size of objects, also configurable by SparkContext's C{batchSize} parameter:
>>> sc = SparkContext('local', 'test', batchSize=2)
>>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
@@ -48,16 +47,6 @@ which contains two batches of two objects:
>>> rdd._jrdd.count()
8L
>>> sc.stop()
-
-A batch size of -1 uses an unlimited batch size, and a size of 1 disables
-batching:
-
->>> sc = SparkContext('local', 'test', batchSize=1)
->>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
->>> rdd.glom().collect()
-[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
->>> rdd._jrdd.count()
-16L
"""
import cPickle
@@ -73,7 +62,7 @@ import itertools
from pyspark import cloudpickle
-__all__ = ["PickleSerializer", "MarshalSerializer"]
+__all__ = ["PickleSerializer", "MarshalSerializer", "UTF8Deserializer"]
class SpecialLengths(object):
@@ -113,7 +102,7 @@ class Serializer(object):
return not self.__eq__(other)
def __repr__(self):
- return "<%s object>" % self.__class__.__name__
+ return "%s()" % self.__class__.__name__
def __hash__(self):
return hash(str(self))
@@ -181,6 +170,7 @@ class BatchedSerializer(Serializer):
"""
UNLIMITED_BATCH_SIZE = -1
+ UNKNOWN_BATCH_SIZE = 0
def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
self.serializer = serializer
@@ -213,10 +203,10 @@ class BatchedSerializer(Serializer):
def __eq__(self, other):
return (isinstance(other, BatchedSerializer) and
- other.serializer == self.serializer)
+ other.serializer == self.serializer and other.batchSize == self.batchSize)
def __repr__(self):
- return "BatchedSerializer<%s>" % str(self.serializer)
+ return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
class AutoBatchedSerializer(BatchedSerializer):
@@ -225,7 +215,7 @@ class AutoBatchedSerializer(BatchedSerializer):
"""
def __init__(self, serializer, bestSize=1 << 16):
- BatchedSerializer.__init__(self, serializer, -1)
+ BatchedSerializer.__init__(self, serializer, self.UNKNOWN_BATCH_SIZE)
self.bestSize = bestSize
def dump_stream(self, iterator, stream):
@@ -248,10 +238,10 @@ class AutoBatchedSerializer(BatchedSerializer):
def __eq__(self, other):
return (isinstance(other, AutoBatchedSerializer) and
- other.serializer == self.serializer)
+ other.serializer == self.serializer and other.bestSize == self.bestSize)
def __str__(self):
- return "AutoBatchedSerializer<%s>" % str(self.serializer)
+ return "AutoBatchedSerializer(%s)" % str(self.serializer)
class CartesianDeserializer(FramedSerializer):
@@ -284,7 +274,7 @@ class CartesianDeserializer(FramedSerializer):
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
def __repr__(self):
- return "CartesianDeserializer<%s, %s>" % \
+ return "CartesianDeserializer(%s, %s)" % \
(str(self.key_ser), str(self.val_ser))
@@ -311,7 +301,7 @@ class PairDeserializer(CartesianDeserializer):
self.key_ser == other.key_ser and self.val_ser == other.val_ser)
def __repr__(self):
- return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser))
+ return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
class NoOpSerializer(FramedSerializer):
@@ -430,7 +420,7 @@ class MarshalSerializer(FramedSerializer):
class AutoSerializer(FramedSerializer):
"""
- Choose marshal or cPickle as serialization protocol autumatically
+ Choose marshal or cPickle as serialization protocol automatically
"""
def __init__(self):
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index d57a802e47..5931e923c2 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -25,7 +25,7 @@ import itertools
import random
import pyspark.heapq3 as heapq
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
try:
import psutil
@@ -213,8 +213,7 @@ class ExternalMerger(Merger):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
- self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
self.localdirs = localdirs or _get_local_dirs(str(id(self)))
# number of partitions when spill data into disks
self.partitions = partitions
@@ -470,7 +469,7 @@ class ExternalSorter(object):
def __init__(self, memory_limit, serializer=None):
self.memory_limit = memory_limit
self.local_dirs = _get_local_dirs("sort")
- self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
def _get_path(self, n):
""" Choose one directory for spill by number n """
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index d16c18bc79..e5d62a466c 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -44,7 +44,8 @@ from py4j.protocol import Py4JError
from py4j.java_collections import ListConverter, MapConverter
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
+ CloudPickleSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -1233,7 +1234,6 @@ class SQLContext(object):
self._sc = sparkContext
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
- self._pythonToJava = self._jvm.PythonRDD.pythonToJavaArray
self._scala_SQLContext = sqlContext
@property
@@ -1263,8 +1263,8 @@ class SQLContext(object):
"""
func = lambda _, it: imap(lambda x: f(*x), it)
command = (func, None,
- BatchedSerializer(PickleSerializer(), 1024),
- BatchedSerializer(PickleSerializer(), 1024))
+ AutoBatchedSerializer(PickleSerializer()),
+ AutoBatchedSerializer(PickleSerializer()))
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
@@ -1443,8 +1443,7 @@ class SQLContext(object):
converter = _python_to_sql_converter(schema)
rdd = rdd.map(converter)
- batched = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- jrdd = self._pythonToJava(rdd._jrdd, batched)
+ jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return SchemaRDD(srdd.toJavaSchemaRDD(), self)
@@ -1841,7 +1840,7 @@ class SchemaRDD(RDD):
self.is_checkpointed = False
self.ctx = self.sql_ctx._sc
# the _jrdd is created by javaToPython(), serialized by pickle
- self._jrdd_deserializer = BatchedSerializer(PickleSerializer())
+ self._jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
@property
def _jrdd(self):
@@ -2071,16 +2070,13 @@ class SchemaRDD(RDD):
def _test():
import doctest
- from array import array
from pyspark.context import SparkContext
# let doctest run in pyspark.sql, so DataTypes can be picklable
import pyspark.sql
from pyspark.sql import Row, SQLContext
from pyspark.tests import ExamplePoint, ExamplePointUDT
globs = pyspark.sql.__dict__.copy()
- # The small batch size here ensures that we see multiple batches,
- # even in these small test examples:
- sc = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize(
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index e947b09468..7e61b017ef 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -242,7 +242,7 @@ class PySparkTestCase(unittest.TestCase):
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
- self.sc = SparkContext('local[4]', class_name, batchSize=2)
+ self.sc = SparkContext('local[4]', class_name)
def tearDown(self):
self.sc.stop()
@@ -253,7 +253,7 @@ class ReusedPySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.sc = SparkContext('local[4]', cls.__name__, batchSize=2)
+ cls.sc = SparkContext('local[4]', cls.__name__)
@classmethod
def tearDownClass(cls):
@@ -671,7 +671,7 @@ class ProfilerTests(PySparkTestCase):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
conf = SparkConf().set("spark.python.profile", "true")
- self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf)
+ self.sc = SparkContext('local[4]', class_name, conf=conf)
def test_profiler(self):
@@ -1012,16 +1012,19 @@ class InputFormatTests(ReusedPySparkTestCase):
clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
- ec = (u'1',
- {u'__class__': u'org.apache.spark.api.python.TestWritable',
- u'double': 54.0, u'int': 123, u'str': u'test1'})
- self.assertEqual(clazz[0], ec)
+ cname = u'org.apache.spark.api.python.TestWritable'
+ ec = [(u'1', {u'__class__': cname, u'double': 1.0, u'int': 1, u'str': u'test1'}),
+ (u'2', {u'__class__': cname, u'double': 2.3, u'int': 2, u'str': u'test2'}),
+ (u'3', {u'__class__': cname, u'double': 3.1, u'int': 3, u'str': u'test3'}),
+ (u'4', {u'__class__': cname, u'double': 4.2, u'int': 4, u'str': u'test4'}),
+ (u'5', {u'__class__': cname, u'double': 5.5, u'int': 5, u'str': u'test56'})]
+ self.assertEqual(clazz, ec)
unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable",
- batchSize=1).collect())
- self.assertEqual(unbatched_clazz[0], ec)
+ ).collect())
+ self.assertEqual(unbatched_clazz, ec)
def test_oldhadoop(self):
basepath = self.tempdir.name
@@ -1341,51 +1344,6 @@ class OutputFormatTests(ReusedPySparkTestCase):
result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
self.assertEqual(result5, data)
- def test_unbatched_save_and_read(self):
- basepath = self.tempdir.name
- ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
- self.sc.parallelize(ei, len(ei)).saveAsSequenceFile(
- basepath + "/unbatched/")
-
- unbatched_sequence = sorted(self.sc.sequenceFile(
- basepath + "/unbatched/",
- batchSize=1).collect())
- self.assertEqual(unbatched_sequence, ei)
-
- unbatched_hadoopFile = sorted(self.sc.hadoopFile(
- basepath + "/unbatched/",
- "org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- batchSize=1).collect())
- self.assertEqual(unbatched_hadoopFile, ei)
-
- unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
- basepath + "/unbatched/",
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- batchSize=1).collect())
- self.assertEqual(unbatched_newAPIHadoopFile, ei)
-
- oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
- unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
- "org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- conf=oldconf,
- batchSize=1).collect())
- self.assertEqual(unbatched_hadoopRDD, ei)
-
- newconf = {"mapred.input.dir": basepath + "/unbatched/"}
- unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
- "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.Text",
- conf=newconf,
- batchSize=1).collect())
- self.assertEqual(unbatched_newAPIHadoopRDD, ei)
-
def test_malformed_RDD(self):
basepath = self.tempdir.name
# non-batch-serialized RDD[[(K, V)]] should be rejected