aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-07-08 12:37:26 -0700
committerReynold Xin <rxin@databricks.com>2016-07-08 12:37:26 -0700
commit67e085ef6dd62774095f3187844c091db1a6a72c (patch)
treef403345e93f19a58bed6eb35151a9e9b90d6c2e0 /common
parent38cf8f2a50068f80350740ac28e31c8accd20634 (diff)
downloadspark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.gz
spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.bz2
spark-67e085ef6dd62774095f3187844c091db1a6a72c.zip
[SPARK-16420] Ensure compression streams are closed.
## What changes were proposed in this pull request? This uses the try/finally pattern to ensure streams are closed after use. `UnsafeShuffleWriter` wasn't closing compression streams, causing them to leak resources until garbage collected. This was causing a problem with codecs that use off-heap memory. ## How was this patch tested? Current tests are sufficient. This should not change behavior. Author: Ryan Blue <blue@apache.org> Closes #14093 from rdblue/SPARK-16420-unsafe-shuffle-writer-leak.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java23
1 files changed, 23 insertions, 0 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
index 922c37a10e..e79eef0325 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
@@ -48,11 +48,27 @@ import com.google.common.base.Preconditions;
* use this functionality in both a Guava 11 environment and a Guava &gt;14 environment.
*/
public final class LimitedInputStream extends FilterInputStream {
+ private final boolean closeWrappedStream;
private long left;
private long mark = -1;
public LimitedInputStream(InputStream in, long limit) {
+ this(in, limit, true);
+ }
+
+ /**
+ * Create a LimitedInputStream that will read {@code limit} bytes from {@code in}.
+ * <p>
+ * If {@code closeWrappedStream} is true, this will close {@code in} when it is closed.
+ * Otherwise, the stream is left open for reading its remaining content.
+ *
+ * @param in a {@link InputStream} to read from
+ * @param limit the number of bytes to read
+ * @param closeWrappedStream whether to close {@code in} when {@link #close} is called
+ */
+ public LimitedInputStream(InputStream in, long limit, boolean closeWrappedStream) {
super(in);
+ this.closeWrappedStream = closeWrappedStream;
Preconditions.checkNotNull(in);
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
left = limit;
@@ -102,4 +118,11 @@ public final class LimitedInputStream extends FilterInputStream {
left -= skipped;
return skipped;
}
+
+ @Override
+ public void close() throws IOException {
+ if (closeWrappedStream) {
+ super.close();
+ }
+ }
}