diff options
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 51 |
1 files changed, 36 insertions, 15 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 830a6ee03f..7b0f8d83ae 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -60,6 +60,7 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _default_batch_size_for_serialized_input = 10 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, @@ -378,7 +379,7 @@ class SparkContext(object): return jm def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, - valueConverter=None, minSplits=None): + valueConverter=None, minSplits=None, batchSize=None): """ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -398,14 +399,18 @@ class SparkContext(object): @param valueConverter: @param minSplits: minimum splits in dataset (default min(2, sc.defaultParallelism)) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ minSplits = minSplits or min(self.defaultParallelism, 2) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, - keyConverter, valueConverter, minSplits) - return RDD(jrdd, self, PickleSerializer()) + keyConverter, valueConverter, minSplits, batchSize) + return RDD(jrdd, self, ser) def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -425,14 +430,18 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. @@ -449,14 +458,18 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -476,14 +489,18 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + valueClass, keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, - valueConverter=None, conf=None): + valueConverter=None, conf=None, batchSize=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. @@ -500,11 +517,15 @@ class SparkContext(object): @param valueConverter: (None by default) @param conf: Hadoop configuration, passed in as a dict (None by default) + @param batchSize: The number of Python objects represented as a single + Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, - keyConverter, valueConverter, jconf) - return RDD(jrdd, self, PickleSerializer()) + keyConverter, valueConverter, jconf, batchSize) + return RDD(jrdd, self, ser) def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) |