aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py51
1 files changed, 36 insertions, 15 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 830a6ee03f..7b0f8d83ae 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -60,6 +60,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
+ _default_batch_size_for_serialized_input = 10
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
@@ -378,7 +379,7 @@ class SparkContext(object):
return jm
def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
- valueConverter=None, minSplits=None):
+ valueConverter=None, minSplits=None, batchSize=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -398,14 +399,18 @@ class SparkContext(object):
@param valueConverter:
@param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
- keyConverter, valueConverter, minSplits)
- return RDD(jrdd, self, PickleSerializer())
+ keyConverter, valueConverter, minSplits, batchSize)
+ return RDD(jrdd, self, ser)
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -425,14 +430,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -449,14 +458,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -476,14 +489,18 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
- valueClass, keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ valueClass, keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
- valueConverter=None, conf=None):
+ valueConverter=None, conf=None, batchSize=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration, which is passed in as a Python dict.
@@ -500,11 +517,15 @@ class SparkContext(object):
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
+ @param batchSize: The number of Python objects represented as a single
+ Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
+ batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+ ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
- keyConverter, valueConverter, jconf)
- return RDD(jrdd, self, PickleSerializer())
+ keyConverter, valueConverter, jconf, batchSize)
+ return RDD(jrdd, self, ser)
def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)