diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2014-09-16 12:41:45 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-09-16 12:41:45 -0700 |
commit | a9e910430fb6bb4ef1f6ae20761c43b96bb018df (patch) | |
tree | 0d7370b73041b90ba622aa172e670b819166c621 /core/src | |
parent | 84073eb1172dc959936149265378f6e24d303685 (diff) | |
download | spark-a9e910430fb6bb4ef1f6ae20761c43b96bb018df.tar.gz spark-a9e910430fb6bb4ef1f6ae20761c43b96bb018df.tar.bz2 spark-a9e910430fb6bb4ef1f6ae20761c43b96bb018df.zip |
[SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #2408 from sarutak/resolve-resource-leak-issue and squashes the following commits:
074781d [Kousuke Saruta] Modified SuffleBlockFetcherIterator
5f63f67 [Kousuke Saruta] Move metrics increment logic and debug logging outside try block
b37231a [Kousuke Saruta] Modified FileSegmentManagedBuffer#nioByteBuffer to check null or not before invoking channel.close
bf29d4a [Kousuke Saruta] Modified FileSegment to close channel
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 2 |
2 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index dcecb6beee..e990c1da67 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -19,6 +19,7 @@ package org.apache.spark.network import java.io.{FileInputStream, RandomAccessFile, File, InputStream} import java.nio.ByteBuffer +import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode import com.google.common.io.ByteStreams @@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt override def size: Long = length override def nioByteBuffer(): ByteBuffer = { - val channel = new RandomAccessFile(file, "r").getChannel - channel.map(MapMode.READ_ONLY, offset, length) + var channel: FileChannel = null + try { + channel = new RandomAccessFile(file, "r").getChannel + channel.map(MapMode.READ_ONLY, offset, length) + } finally { + if (channel != null) { + channel.close() + } + } } override def inputStream(): InputStream = { diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c8e708aa6b..d868758a7f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashSet import scala.collection.mutable.Queue -import org.apache.spark.{TaskContext, Logging, SparkException} +import org.apache.spark.{TaskContext, Logging} import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService} import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils |