diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-11-18 16:00:35 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-18 16:00:35 -0800 |
commit | 4b117121900e5f242e7c8f46a69164385f0da7cc (patch) | |
tree | 3134b152d511565a7bc4ba7b2cc8e61f859a9f2a /core | |
parent | c07a50b86254578625be777b1890ff95e832ac6e (diff) | |
download | spark-4b117121900e5f242e7c8f46a69164385f0da7cc.tar.gz spark-4b117121900e5f242e7c8f46a69164385f0da7cc.tar.bz2 spark-4b117121900e5f242e7c8f46a69164385f0da7cc.zip |
[SPARK-11495] Fix potential socket / file handle leaks that were found via static analysis
The HP Fortify Opens Source Review team (https://www.hpfod.com/open-source-review-project) reported a handful of potential resource leaks that were discovered using their static analysis tool. We should fix the issues identified by their scan.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #9455 from JoshRosen/fix-potential-resource-leaks.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 | ||||
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 38 |
2 files changed, 30 insertions, 15 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 04694dc544..3387f9a417 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.LinkedList; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Closeables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -272,6 +273,7 @@ public final class BytesToBytesMap extends MemoryConsumer { } } try { + Closeables.close(reader, /* swallowIOException = */ false); reader = spillWriters.getFirst().getReader(blockManager); recordsInPage = -1; } catch (IOException e) { @@ -318,6 +320,11 @@ public final class BytesToBytesMap extends MemoryConsumer { try { reader.loadNext(); } catch (IOException e) { + try { + reader.close(); + } catch(IOException e2) { + logger.error("Error while closing spill reader", e2); + } // Scala iterator does not handle exception Platform.throwException(e); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 039e940a35..dcb13e6581 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -20,8 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort; import java.io.*; import com.google.common.io.ByteStreams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.io.Closeables; import org.apache.spark.storage.BlockId; import org.apache.spark.storage.BlockManager; @@ -31,10 +30,8 @@ import org.apache.spark.unsafe.Platform; * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description * of the file format). */ -public final class UnsafeSorterSpillReader extends UnsafeSorterIterator { - private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); +public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable { - private final File file; private InputStream in; private DataInputStream din; @@ -52,11 +49,15 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator { File file, BlockId blockId) throws IOException { assert (file.length() > 0); - this.file = file; final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file)); - this.in = blockManager.wrapForCompression(blockId, bs); - this.din = new DataInputStream(this.in); - numRecordsRemaining = din.readInt(); + try { + this.in = blockManager.wrapForCompression(blockId, bs); + this.din = new DataInputStream(this.in); + numRecordsRemaining = din.readInt(); + } catch (IOException e) { + Closeables.close(bs, /* swallowIOException = */ true); + throw e; + } } @Override @@ -75,12 +76,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator { ByteStreams.readFully(in, arr, 0, recordLength); numRecordsRemaining--; if (numRecordsRemaining == 0) { - in.close(); - if (!file.delete() && file.exists()) { - logger.warn("Unable to delete spill file {}", file.getPath()); - } - in = null; - din = null; + close(); } } @@ -103,4 +99,16 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator { public long getKeyPrefix() { return keyPrefix; } + + @Override + public void close() throws IOException { + if (in != null) { + try { + in.close(); + } finally { + in = null; + din = null; + } + } + } } |