diff options
author | Ryan Blue <blue@apache.org> | 2016-07-08 12:37:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-08 12:37:26 -0700 |
commit | 67e085ef6dd62774095f3187844c091db1a6a72c (patch) | |
tree | f403345e93f19a58bed6eb35151a9e9b90d6c2e0 /common/network-common/src | |
parent | 38cf8f2a50068f80350740ac28e31c8accd20634 (diff) | |
download | spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.gz spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.bz2 spark-67e085ef6dd62774095f3187844c091db1a6a72c.zip |
[SPARK-16420] Ensure compression streams are closed.
## What changes were proposed in this pull request?
This uses the try/finally pattern to ensure streams are closed after use. `UnsafeShuffleWriter` wasn't closing compression streams, causing them to leak resources until garbage collected. This was causing a problem with codecs that use off-heap memory.
## How was this patch tested?
Current tests are sufficient. This should not change behavior.
Author: Ryan Blue <blue@apache.org>
Closes #14093 from rdblue/SPARK-16420-unsafe-shuffle-writer-leak.
Diffstat (limited to 'common/network-common/src')
-rw-r--r-- | common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java | 23 |
1 files changed, 23 insertions, 0 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java index 922c37a10e..e79eef0325 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java @@ -48,11 +48,27 @@ import com.google.common.base.Preconditions; * use this functionality in both a Guava 11 environment and a Guava >14 environment. */ public final class LimitedInputStream extends FilterInputStream { + private final boolean closeWrappedStream; private long left; private long mark = -1; public LimitedInputStream(InputStream in, long limit) { + this(in, limit, true); + } + + /** + * Create a LimitedInputStream that will read {@code limit} bytes from {@code in}. + * <p> + * If {@code closeWrappedStream} is true, this will close {@code in} when it is closed. + * Otherwise, the stream is left open for reading its remaining content. + * + * @param in a {@link InputStream} to read from + * @param limit the number of bytes to read + * @param closeWrappedStream whether to close {@code in} when {@link #close} is called + */ + public LimitedInputStream(InputStream in, long limit, boolean closeWrappedStream) { super(in); + this.closeWrappedStream = closeWrappedStream; Preconditions.checkNotNull(in); Preconditions.checkArgument(limit >= 0, "limit must be non-negative"); left = limit; @@ -102,4 +118,11 @@ public final class LimitedInputStream extends FilterInputStream { left -= skipped; return skipped; } + + @Override + public void close() throws IOException { + if (closeWrappedStream) { + super.close(); + } + } } |