diff options
author | Ryan Blue <blue@apache.org> | 2016-07-08 12:37:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-08 12:37:26 -0700 |
commit | 67e085ef6dd62774095f3187844c091db1a6a72c (patch) | |
tree | f403345e93f19a58bed6eb35151a9e9b90d6c2e0 /core/src/main/java | |
parent | 38cf8f2a50068f80350740ac28e31c8accd20634 (diff) | |
download | spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.gz spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.bz2 spark-67e085ef6dd62774095f3187844c091db1a6a72c.zip |
[SPARK-16420] Ensure compression streams are closed.
## What changes were proposed in this pull request?
This uses the try/finally pattern to ensure streams are closed after use. `UnsafeShuffleWriter` wasn't closing compression streams, causing them to leak resources until garbage collected. This was causing a problem with codecs that use off-heap memory.
## How was this patch tested?
Current tests are sufficient. This should not change behavior.
Author: Ryan Blue <blue@apache.org>
Closes #14093 from rdblue/SPARK-16420-unsafe-shuffle-writer-leak.
Diffstat (limited to 'core/src/main/java')
-rw-r--r-- | core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 05fa04c44d..08fb887bbd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -349,12 +349,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { for (int i = 0; i < spills.length; i++) { final long partitionLengthInSpill = spills[i].partitionLengths[partition]; if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = - new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill); - if (compressionCodec != null) { - partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); + InputStream partitionInputStream = null; + boolean innerThrewException = true; + try { + partitionInputStream = + new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill, false); + if (compressionCodec != null) { + partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); + } + ByteStreams.copy(partitionInputStream, mergedFileOutputStream); + innerThrewException = false; + } finally { + Closeables.close(partitionInputStream, innerThrewException); } - ByteStreams.copy(partitionInputStream, mergedFileOutputStream); } } mergedFileOutputStream.flush(); |