diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/context.py | 2 | ||||
-rw-r--r-- | python/pyspark/serializers.py | 4 |
2 files changed, 5 insertions, 1 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 593d74bca5..64f6a3ca6b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -319,7 +319,7 @@ class SparkContext(object): # Make sure we distribute data evenly if it's smaller than self.batchSize if "__len__" not in dir(c): c = list(c) # Make it a list so we can compute its length - batchSize = max(1, min(len(c) // numSlices, self._batchSize)) + batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024)) serializer = BatchedSerializer(self._unbatched_serializer, batchSize) serializer.dump_stream(c, tempFile) tempFile.close() diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index bd08c9a6d2..b8bda83517 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -181,6 +181,10 @@ class BatchedSerializer(Serializer): def _batched(self, iterator): if self.batchSize == self.UNLIMITED_BATCH_SIZE: yield list(iterator) + elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"): + n = len(iterator) + for i in xrange(0, n, self.batchSize): + yield iterator[i: i + self.batchSize] else: items = [] count = 0 |