diff options
author | jerryshao <saisai.shao@intel.com> | 2015-05-11 14:38:58 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 14:39:08 -0700 |
commit | 9e226e1c8fdf5f928371ccbe06e7585367e1305a (patch) | |
tree | d93a76734d120cc7e7f396e5f943bc1f49997ea7 | |
parent | 1538b10e8b0d681dea4f2718c272a46593fd165e (diff) | |
download | spark-9e226e1c8fdf5f928371ccbe06e7585367e1305a.tar.gz spark-9e226e1c8fdf5f928371ccbe06e7585367e1305a.tar.bz2 spark-9e226e1c8fdf5f928371ccbe06e7585367e1305a.zip |
[STREAMING] [MINOR] Close files correctly when iterator is finished in streaming WAL recovery
Currently there's no chance to close the file correctly after the iteration is finished, change to `CompletionIterator` to avoid resource leakage.
Author: jerryshao <saisai.shao@intel.com>
Closes #6050 from jerryshao/close-file-correctly and squashes the following commits:
52dfaf5 [jerryshao] Close files correctly when iterator is finished
(cherry picked from commit 25c01c54840a9ab768f8b917de7edc2bc2d61b9e)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9985fedc35..87ba4f84a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -26,7 +26,7 @@ import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{CompletionIterator, ThreadUtils} import org.apache.spark.{Logging, SparkConf} /** @@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog( logFilesToRead.iterator.map { file => logDebug(s"Creating log reader with $file") - new FileBasedWriteAheadLogReader(file, hadoopConf) + val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) + CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _) } flatMap { x => x } } |