aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-05-11 14:38:58 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-11 14:38:58 -0700
commit25c01c54840a9ab768f8b917de7edc2bc2d61b9e (patch)
treeecb0f9aeaa6b0940992c56cd7231e161faa38058 /streaming
parent8e674331d9ce98068b44e4d483b6d35cef0648fa (diff)
downloadspark-25c01c54840a9ab768f8b917de7edc2bc2d61b9e.tar.gz
spark-25c01c54840a9ab768f8b917de7edc2bc2d61b9e.tar.bz2
spark-25c01c54840a9ab768f8b917de7edc2bc2d61b9e.zip
[STREAMING] [MINOR] Close files correctly when iterator is finished in streaming WAL recovery
Currently there's no chance to close the file correctly after the iteration is finished, change to `CompletionIterator` to avoid resource leakage. Author: jerryshao <saisai.shao@intel.com> Closes #6050 from jerryshao/close-file-correctly and squashes the following commits: 52dfaf5 [jerryshao] Close files correctly when iterator is finished
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9985fedc35..87ba4f84a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -26,7 +26,7 @@ import scala.language.postfixOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{CompletionIterator, ThreadUtils}
import org.apache.spark.{Logging, SparkConf}
/**
@@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog(
logFilesToRead.iterator.map { file =>
logDebug(s"Creating log reader with $file")
- new FileBasedWriteAheadLogReader(file, hadoopConf)
+ val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
+ CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
} flatMap { x => x }
}