aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-03 23:56:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-03 23:56:14 -0800
commite4f42631a68b473ce706429915f3f08042af2119 (patch)
tree557ff754b9936addfb9628bfcba462802ff6ec1c /python/pyspark/context.py
parentb671ce047d036b8923007902826038b01e836e8a (diff)
downloadspark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz
spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2
spark-e4f42631a68b473ce706429915f3f08042af2119.zip
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu <davies@databricks.com> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py58
1 files changed, 20 insertions, 38 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5f8dcedb1e..a0e4821728 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -63,7 +63,6 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- _default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
@@ -115,9 +114,7 @@ class SparkContext(object):
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
- if batchSize == 1:
- self.serializer = self._unbatched_serializer
- elif batchSize == 0:
+ if batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
@@ -305,12 +302,8 @@ class SparkContext(object):
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
- batchSize = min(len(c) // numSlices, self._batchSize)
- if batchSize > 1:
- serializer = BatchedSerializer(self._unbatched_serializer,
- batchSize)
- else:
- serializer = self._unbatched_serializer
+ batchSize = max(1, min(len(c) // numSlices, self._batchSize))
+ serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
serializer.dump_stream(c, tempFile)
tempFile.close()
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
@@ -328,8 +321,7 @@ class SparkContext(object):
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
minPartitions = minPartitions or self.defaultMinPartitions
- return RDD(self._jsc.objectFile(name, minPartitions), self,
- BatchedSerializer(PickleSerializer()))
+ return RDD(self._jsc.objectFile(name, minPartitions), self)
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
@@ -405,7 +397,7 @@ class SparkContext(object):
return jm
def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
- valueConverter=None, minSplits=None, batchSize=None):
+ valueConverter=None, minSplits=None, batchSize=0):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -427,17 +419,15 @@ class SparkContext(object):
:param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -458,18 +448,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -487,18 +475,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -519,18 +505,16 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None, batchSize=None):
+ valueConverter=None, conf=None, batchSize=0):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -548,15 +532,13 @@ class SparkContext(object):
:param conf: Hadoop configuration, passed in as a dict
(None by default)
:param batchSize: The number of Python objects represented as a single
- Java object. (default sc._default_batch_size_for_serialized_input)
+ Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
- ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
- return RDD(jrdd, self, ser)
+ return RDD(jrdd, self)
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
@@ -836,7 +818,7 @@ def _test():
import doctest
import tempfile
globs = globals().copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)