aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2014-09-16 12:41:45 -0700
committerReynold Xin <rxin@apache.org>2014-09-16 12:41:45 -0700
commita9e910430fb6bb4ef1f6ae20761c43b96bb018df (patch)
tree0d7370b73041b90ba622aa172e670b819166c621
parent84073eb1172dc959936149265378f6e24d303685 (diff)
downloadspark-a9e910430fb6bb4ef1f6ae20761c43b96bb018df.tar.gz
spark-a9e910430fb6bb4ef1f6ae20761c43b96bb018df.tar.bz2
spark-a9e910430fb6bb4ef1f6ae20761c43b96bb018df.zip
[SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2408 from sarutak/resolve-resource-leak-issue and squashes the following commits: 074781d [Kousuke Saruta] Modified SuffleBlockFetcherIterator 5f63f67 [Kousuke Saruta] Move metrics increment logic and debug logging outside try block b37231a [Kousuke Saruta] Modified FileSegmentManagedBuffer#nioByteBuffer to check null or not before invoking channel.close bf29d4a [Kousuke Saruta] Modified FileSegment to close channel
-rw-r--r--core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala2
2 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index dcecb6beee..e990c1da67 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network
import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode
import com.google.common.io.ByteStreams
@@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
override def size: Long = length
override def nioByteBuffer(): ByteBuffer = {
- val channel = new RandomAccessFile(file, "r").getChannel
- channel.map(MapMode.READ_ONLY, offset, length)
+ var channel: FileChannel = null
+ try {
+ channel = new RandomAccessFile(file, "r").getChannel
+ channel.map(MapMode.READ_ONLY, offset, length)
+ } finally {
+ if (channel != null) {
+ channel.close()
+ }
+ }
}
override def inputStream(): InputStream = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index c8e708aa6b..d868758a7f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import scala.collection.mutable.Queue
-import org.apache.spark.{TaskContext, Logging, SparkException}
+import org.apache.spark.{TaskContext, Logging}
import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils