aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorfrreiss <frreiss@us.ibm.com>2016-11-01 23:00:17 -0700
committerReynold Xin <rxin@databricks.com>2016-11-01 23:00:17 -0700
commit620da3b4828b3580c7ed7339b2a07938e6be1bb1 (patch)
tree7aafcf510be21ff1bbed56377417a78025f8cdc4 /sql
parent1bbf9ff634745148e782370009aa31d3a042638c (diff)
downloadspark-620da3b4828b3580c7ed7339b2a07938e6be1bb1.tar.gz
spark-620da3b4828b3580c7ed7339b2a07938e6be1bb1.tar.bz2
spark-620da3b4828b3580c7ed7339b2a07938e6be1bb1.zip
[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files
## What changes were proposed in this pull request? When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files. ## How was this patch tested? Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem. Ran the entire regression suite. Author: frreiss <frreiss@us.ibm.com> Closes #15027 from frreiss/fred-17475.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala6
2 files changed, 11 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index c7235320fd..9a0f87cf04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
fileManager.rename(tempPath, batchIdToPath(batchId))
+
+ // SPARK-17475: HDFSMetadataLog should not leak CRC files
+ // If the underlying filesystem didn't rename the CRC file, delete it.
+ val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+ if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9c1d26dcb2..d03e08d9a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
assert(metadataLog.get(1).isEmpty)
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)
+
+ // There should be exactly one file, called "2", in the metadata directory.
+ // This check also tests for regressions of SPARK-17475
+ val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
+ assert(allFiles.size == 1)
+ assert(allFiles(0).getName() == "2")
}
}