aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-03 23:56:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-03 23:56:14 -0800
commite4f42631a68b473ce706429915f3f08042af2119 (patch)
tree557ff754b9936addfb9628bfcba462802ff6ec1c /python/pyspark/shuffle.py
parentb671ce047d036b8923007902826038b01e836e8a (diff)
downloadspark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz
spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2
spark-e4f42631a68b473ce706429915f3f08042af2119.zip
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu <davies@databricks.com> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index d57a802e47..5931e923c2 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -25,7 +25,7 @@ import itertools
import random
import pyspark.heapq3 as heapq
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
try:
import psutil
@@ -213,8 +213,7 @@ class ExternalMerger(Merger):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
- self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
self.localdirs = localdirs or _get_local_dirs(str(id(self)))
# number of partitions when spill data into disks
self.partitions = partitions
@@ -470,7 +469,7 @@ class ExternalSorter(object):
def __init__(self, memory_limit, serializer=None):
self.memory_limit = memory_limit
self.local_dirs = _get_local_dirs("sort")
- self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
+ self.serializer = serializer or AutoBatchedSerializer(PickleSerializer())
def _get_path(self, n):
""" Choose one directory for spill by number n """