aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala2
2 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index dcecb6beee..e990c1da67 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network
import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode
import com.google.common.io.ByteStreams
@@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
override def size: Long = length
override def nioByteBuffer(): ByteBuffer = {
- val channel = new RandomAccessFile(file, "r").getChannel
- channel.map(MapMode.READ_ONLY, offset, length)
+ var channel: FileChannel = null
+ try {
+ channel = new RandomAccessFile(file, "r").getChannel
+ channel.map(MapMode.READ_ONLY, offset, length)
+ } finally {
+ if (channel != null) {
+ channel.close()
+ }
+ }
}
override def inputStream(): InputStream = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index c8e708aa6b..d868758a7f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import scala.collection.mutable.Queue
-import org.apache.spark.{TaskContext, Logging, SparkException}
+import org.apache.spark.{TaskContext, Logging}
import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils