aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index fca3d51535..069e41b6ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -175,8 +175,12 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
val input = fileManager.open(batchMetadataFile)
- val bytes = IOUtils.toByteArray(input)
- Some(deserialize(bytes))
+ try {
+ val bytes = IOUtils.toByteArray(input)
+ Some(deserialize(bytes))
+ } finally {
+ input.close()
+ }
} else {
logDebug(s"Unable to find batch $batchMetadataFile")
None