diff options
author | frreiss <frreiss@us.ibm.com> | 2016-11-01 23:00:17 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-11-01 23:00:17 -0700 |
commit | 620da3b4828b3580c7ed7339b2a07938e6be1bb1 (patch) | |
tree | 7aafcf510be21ff1bbed56377417a78025f8cdc4 /sql | |
parent | 1bbf9ff634745148e782370009aa31d3a042638c (diff) | |
download | spark-620da3b4828b3580c7ed7339b2a07938e6be1bb1.tar.gz spark-620da3b4828b3580c7ed7339b2a07938e6be1bb1.tar.bz2 spark-620da3b4828b3580c7ed7339b2a07938e6be1bb1.zip |
[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files
## What changes were proposed in this pull request?
When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?
Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem. Ran the entire regression suite.
Author: frreiss <frreiss@us.ibm.com>
Closes #15027 from frreiss/fred-17475.
Diffstat (limited to 'sql')
2 files changed, 11 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index c7235320fd..9a0f87cf04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) // It will fail if there is an existing file (someone has committed the batch) logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") fileManager.rename(tempPath, batchIdToPath(batchId)) + + // SPARK-17475: HDFSMetadataLog should not leak CRC files + // If the underlying filesystem didn't rename the CRC file, delete it. + val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") + if (fileManager.exists(crcPath)) fileManager.delete(crcPath) return } catch { case e: IOException if isFileAlreadyExistsException(e) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 9c1d26dcb2..d03e08d9a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { assert(metadataLog.get(1).isEmpty) assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) + + // There should be exactly one file, called "2", in the metadata directory. + // This check also tests for regressions of SPARK-17475 + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size == 1) + assert(allFiles(0).getName() == "2") } } |