aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index d57a802e47..5931e923c2 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -25,7 +25,7 @@ import itertools
import random
import pyspark.heapq3 as heapq
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
try:
import psutil
@@ -213,8 +213,7 @@ class ExternalMerger(Merger):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
- self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
self.localdirs = localdirs or _get_local_dirs(str(id(self)))
# number of partitions when spill data into disks
self.partitions = partitions
@@ -470,7 +469,7 @@ class ExternalSorter(object):
def __init__(self, memory_limit, serializer=None):
self.memory_limit = memory_limit
self.local_dirs = _get_local_dirs("sort")
- self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
def _get_path(self, n):
""" Choose one directory for spill by number n """