aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-07-08 12:37:26 -0700
committerReynold Xin <rxin@databricks.com>2016-07-08 12:37:26 -0700
commit67e085ef6dd62774095f3187844c091db1a6a72c (patch)
treef403345e93f19a58bed6eb35151a9e9b90d6c2e0 /core
parent38cf8f2a50068f80350740ac28e31c8accd20634 (diff)
downloadspark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.gz
spark-67e085ef6dd62774095f3187844c091db1a6a72c.tar.bz2
spark-67e085ef6dd62774095f3187844c091db1a6a72c.zip
[SPARK-16420] Ensure compression streams are closed.
## What changes were proposed in this pull request? This uses the try/finally pattern to ensure streams are closed after use. `UnsafeShuffleWriter` wasn't closing compression streams, causing them to leak resources until garbage collected. This was causing a problem with codecs that use off-heap memory. ## How was this patch tested? Current tests are sufficient. This should not change behavior. Author: Ryan Blue <blue@apache.org> Closes #14093 from rdblue/SPARK-16420-unsafe-shuffle-writer-leak.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java17
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala15
3 files changed, 34 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 05fa04c44d..08fb887bbd 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -349,12 +349,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
if (partitionLengthInSpill > 0) {
- InputStream partitionInputStream =
- new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill);
- if (compressionCodec != null) {
- partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+ InputStream partitionInputStream = null;
+ boolean innerThrewException = true;
+ try {
+ partitionInputStream =
+ new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill, false);
+ if (compressionCodec != null) {
+ partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+ }
+ ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
+ innerThrewException = false;
+ } finally {
+ Closeables.close(partitionInputStream, innerThrewException);
}
- ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
}
}
mergedFileOutputStream.flush();
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 632b0ae9c2..e8d6d587b4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -232,7 +232,11 @@ private object TorrentBroadcast extends Logging {
val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
- serOut.writeObject[T](obj).close()
+ Utils.tryWithSafeFinally {
+ serOut.writeObject[T](obj)
+ } {
+ serOut.close()
+ }
cbbos.toChunkedByteBuffer.getChunks()
}
@@ -246,8 +250,11 @@ private object TorrentBroadcast extends Logging {
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = serializer.newInstance()
val serIn = ser.deserializeStream(in)
- val obj = serIn.readObject[T]()
- serIn.close()
+ val obj = Utils.tryWithSafeFinally {
+ serIn.readObject[T]()
+ } {
+ serIn.close()
+ }
obj
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index d17a7894fd..f0ed41f690 100644
--- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
/**
* Custom serializer used for generic Avro records. If the user registers the schemas
@@ -72,8 +73,11 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
val bos = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(bos)
- out.write(schema.toString.getBytes(StandardCharsets.UTF_8))
- out.close()
+ Utils.tryWithSafeFinally {
+ out.write(schema.toString.getBytes(StandardCharsets.UTF_8))
+ } {
+ out.close()
+ }
bos.toByteArray
})
@@ -86,7 +90,12 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
schemaBytes.array(),
schemaBytes.arrayOffset() + schemaBytes.position(),
schemaBytes.remaining())
- val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+ val in = codec.compressedInputStream(bis)
+ val bytes = Utils.tryWithSafeFinally {
+ IOUtils.toByteArray(in)
+ } {
+ in.close()
+ }
new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
})