aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-07-08 12:37:26 -0700
committerReynold Xin <rxin@databricks.com>2016-07-08 12:37:26 -0700
commit67e085ef6dd62774095f3187844c091db1a6a72c (patch)
treef403345e93f19a58bed6eb35151a9e9b90d6c2e0 /core/src/main/java/org
parent38cf8f2a50068f80350740ac28e31c8accd20634 (diff)
downloadspark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.gz
spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.bz2
spark-67e085ef6dd62774095f3187844c091db1a6a72c.zip
[SPARK-16420] Ensure compression streams are closed.
## What changes were proposed in this pull request? This uses the try/finally pattern to ensure streams are closed after use. `UnsafeShuffleWriter` wasn't closing compression streams, causing them to leak resources until garbage collected. This was causing a problem with codecs that use off-heap memory. ## How was this patch tested? Current tests are sufficient. This should not change behavior. Author: Ryan Blue <blue@apache.org> Closes #14093 from rdblue/SPARK-16420-unsafe-shuffle-writer-leak.
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java17
1 files changed, 12 insertions, 5 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 05fa04c44d..08fb887bbd 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -349,12 +349,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
if (partitionLengthInSpill > 0) {
- InputStream partitionInputStream =
- new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill);
- if (compressionCodec != null) {
- partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+ InputStream partitionInputStream = null;
+ boolean innerThrewException = true;
+ try {
+ partitionInputStream =
+ new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill, false);
+ if (compressionCodec != null) {
+ partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+ }
+ ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
+ innerThrewException = false;
+ } finally {
+ Closeables.close(partitionInputStream, innerThrewException);
}
- ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
}
}
mergedFileOutputStream.flush();