aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-11-18 16:00:35 -0800
committerReynold Xin <rxin@databricks.com>2015-11-18 16:00:35 -0800
commit4b117121900e5f242e7c8f46a69164385f0da7cc (patch)
tree3134b152d511565a7bc4ba7b2cc8e61f859a9f2a
parentc07a50b86254578625be777b1890ff95e832ac6e (diff)
downloadspark-4b117121900e5f242e7c8f46a69164385f0da7cc.tar.gz
spark-4b117121900e5f242e7c8f46a69164385f0da7cc.tar.bz2
spark-4b117121900e5f242e7c8f46a69164385f0da7cc.zip
[SPARK-11495] Fix potential socket / file handle leaks that were found via static analysis
The HP Fortify Opens Source Review team (https://www.hpfod.com/open-source-review-project) reported a handful of potential resource leaks that were discovered using their static analysis tool. We should fix the issues identified by their scan. Author: Josh Rosen <joshrosen@databricks.com> Closes #9455 from JoshRosen/fix-potential-resource-leaks.
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java7
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java38
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java31
-rw-r--r--network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java15
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java32
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java20
6 files changed, 90 insertions, 53 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 04694dc544..3387f9a417 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -272,6 +273,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
}
try {
+ Closeables.close(reader, /* swallowIOException = */ false);
reader = spillWriters.getFirst().getReader(blockManager);
recordsInPage = -1;
} catch (IOException e) {
@@ -318,6 +320,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
try {
reader.loadNext();
} catch (IOException e) {
+ try {
+ reader.close();
+ } catch(IOException e2) {
+ logger.error("Error while closing spill reader", e2);
+ }
// Scala iterator does not handle exception
Platform.throwException(e);
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 039e940a35..dcb13e6581 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -20,8 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.*;
import com.google.common.io.ByteStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.io.Closeables;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
@@ -31,10 +30,8 @@ import org.apache.spark.unsafe.Platform;
* Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
* of the file format).
*/
-public final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
- private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
+public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
- private final File file;
private InputStream in;
private DataInputStream din;
@@ -52,11 +49,15 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
- this.file = file;
final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
- this.in = blockManager.wrapForCompression(blockId, bs);
- this.din = new DataInputStream(this.in);
- numRecordsRemaining = din.readInt();
+ try {
+ this.in = blockManager.wrapForCompression(blockId, bs);
+ this.din = new DataInputStream(this.in);
+ numRecordsRemaining = din.readInt();
+ } catch (IOException e) {
+ Closeables.close(bs, /* swallowIOException = */ true);
+ throw e;
+ }
}
@Override
@@ -75,12 +76,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
ByteStreams.readFully(in, arr, 0, recordLength);
numRecordsRemaining--;
if (numRecordsRemaining == 0) {
- in.close();
- if (!file.delete() && file.exists()) {
- logger.warn("Unable to delete spill file {}", file.getPath());
- }
- in = null;
- din = null;
+ close();
}
}
@@ -103,4 +99,16 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
public long getKeyPrefix() {
return keyPrefix;
}
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ try {
+ in.close();
+ } finally {
+ in = null;
+ din = null;
+ }
+ }
+ }
}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 99df259b4e..4b50fbf59f 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -121,23 +122,23 @@ public class JavaCustomReceiver extends Receiver<String> {
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
- Socket socket = null;
- String userInput = null;
-
try {
- // connect to the server
- socket = new Socket(host, port);
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
- // Until stopped or connection broken continue reading
- while (!isStopped() && (userInput = reader.readLine()) != null) {
- System.out.println("Received data '" + userInput + "'");
- store(userInput);
+ Socket socket = null;
+ BufferedReader reader = null;
+ String userInput = null;
+ try {
+ // connect to the server
+ socket = new Socket(host, port);
+ reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ // Until stopped or connection broken continue reading
+ while (!isStopped() && (userInput = reader.readLine()) != null) {
+ System.out.println("Received data '" + userInput + "'");
+ store(userInput);
+ }
+ } finally {
+ Closeables.close(reader, /* swallowIOException = */ true);
+ Closeables.close(socket, /* swallowIOException = */ true);
}
- reader.close();
- socket.close();
-
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index dc5fa1cee6..50a324e293 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -78,10 +79,15 @@ public class ChunkFetchIntegrationSuite {
testFile = File.createTempFile("shuffle-test-file", "txt");
testFile.deleteOnExit();
RandomAccessFile fp = new RandomAccessFile(testFile, "rw");
- byte[] fileContent = new byte[1024];
- new Random().nextBytes(fileContent);
- fp.write(fileContent);
- fp.close();
+ boolean shouldSuppressIOException = true;
+ try {
+ byte[] fileContent = new byte[1024];
+ new Random().nextBytes(fileContent);
+ fp.write(fileContent);
+ shouldSuppressIOException = false;
+ } finally {
+ Closeables.close(fp, shouldSuppressIOException);
+ }
final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
@@ -117,6 +123,7 @@ public class ChunkFetchIntegrationSuite {
@AfterClass
public static void tearDown() {
+ bufferChunk.release();
server.close();
clientFactory.close();
testFile.delete();
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 3fdde054ab..7ac1ca128a 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
@@ -60,21 +61,28 @@ public class TestShuffleDataContext {
public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
- OutputStream dataStream = new FileOutputStream(
- ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
- DataOutputStream indexStream = new DataOutputStream(new FileOutputStream(
- ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+ OutputStream dataStream = null;
+ DataOutputStream indexStream = null;
+ boolean suppressExceptionsDuringClose = true;
- long offset = 0;
- indexStream.writeLong(offset);
- for (byte[] block : blocks) {
- offset += block.length;
- dataStream.write(block);
+ try {
+ dataStream = new FileOutputStream(
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
+ indexStream = new DataOutputStream(new FileOutputStream(
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+
+ long offset = 0;
indexStream.writeLong(offset);
+ for (byte[] block : blocks) {
+ offset += block.length;
+ dataStream.write(block);
+ indexStream.writeLong(offset);
+ }
+ suppressExceptionsDuringClose = false;
+ } finally {
+ Closeables.close(dataStream, suppressExceptionsDuringClose);
+ Closeables.close(indexStream, suppressExceptionsDuringClose);
}
-
- dataStream.close();
- indexStream.close();
}
/** Creates reducer blocks in a hash-based data format within our local dirs. */
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index ec2bffd6a5..7a8ef9d147 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -23,6 +23,7 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import static org.junit.Assert.*;
+import com.google.common.io.Closeables;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -121,14 +122,19 @@ public class JavaReceiverAPISuite implements Serializable {
private void receive() {
try {
- Socket socket = new Socket(host, port);
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- String userInput;
- while ((userInput = in.readLine()) != null) {
- store(userInput);
+ Socket socket = null;
+ BufferedReader in = null;
+ try {
+ socket = new Socket(host, port);
+ in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ } finally {
+ Closeables.close(in, /* swallowIOException = */ true);
+ Closeables.close(socket, /* swallowIOException = */ true);
}
- in.close();
- socket.close();
} catch(ConnectException ce) {
ce.printStackTrace();
restart("Could not connect", ce);