diff options
author | Josh Rosen <joshrosen@databricks.com> | 2014-10-22 14:49:58 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-10-22 14:49:58 -0700 |
commit | 813effc701fc27121c6f23ab32882932016fdbe0 (patch) | |
tree | 6983c9fc478a4b6fe4594d5f3442510462e4ca9c /yarn | |
parent | 97cf19f64e924569892e0a0417de19329855f4af (diff) | |
download | spark-813effc701fc27121c6f23ab32882932016fdbe0.tar.gz spark-813effc701fc27121c6f23ab32882932016fdbe0.tar.bz2 spark-813effc701fc27121c6f23ab32882932016fdbe0.zip |
[SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress and spark.shuffle.spill.compress settings are different
This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.
The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression. ExternalSorter writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but these
spilled files end up being shuffled over the network and read as shuffle files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression. As a result,
this leads to errors when these settings disagree.
Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.
To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used temp
blocks for spilling data. It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other contexts
where we want different configuration options to control compression. To
summarize:
**Before:**
| | ExternalAppendOnlyMap | ExternalSorter |
|-------|------------------------------|------------------------------|
| Read | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |
**After:**
| | ExternalAppendOnlyMap | ExternalSorter |
|-------|------------------------------|------------------------|
| Read | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |
Thanks to andrewor14 for debugging this with me!
Author: Josh Rosen <joshrosen@databricks.com>
Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits:
1921cf6 [Josh Rosen] Minor edit for clarity.
c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock().
2c687b9 [Josh Rosen] Fix SPARK-3426.
91e7e40 [Josh Rosen] Combine tests into single test of all combinations
76ca65e [Josh Rosen] Add regression test for SPARK-3426.
Diffstat (limited to 'yarn')
0 files changed, 0 insertions, 0 deletions