diff options
author | uncleGen <hustyugm@gmail.com> | 2016-12-28 10:42:47 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-12-28 10:42:47 +0000 |
commit | 76e9bd74885a99462ed0957aad37cbead7f14de2 (patch) | |
tree | 6ca74db8ff1454df05c0709bf749d6fb907d80f3 /sql | |
parent | 67fb33e7e078eef3ecd5dcbfc26659b6fe2d054e (diff) | |
download | spark-76e9bd74885a99462ed0957aad37cbead7f14de2.tar.gz spark-76e9bd74885a99462ed0957aad37cbead7f14de2.tar.bz2 spark-76e9bd74885a99462ed0957aad37cbead7f14de2.zip |
[SPARK-18960][SQL][SS] Avoid double reading file which is being copied.
## What changes were proposed in this pull request?
In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file for a period of time. The duration depends on file size. If we do not skip this file, we will may read the same data for two times.
## How was this patch tested?
update unit test
Author: uncleGen <hustyugm@gmail.com>
Closes #16370 from uncleGen/SPARK-18960.
Diffstat (limited to 'sql')
2 files changed, 9 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 825a0f70dd..82c1599a39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -439,10 +439,15 @@ object PartitioningAwareFileIndex extends Logging { /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { - // We filter everything that starts with _ and ., except _common_metadata and _metadata + // We filter follow paths: + // 1. everything that starts with _ and ., except _common_metadata and _metadata // because Parquet needs to find those metadata files from leaf files returned by this method. // We should refactor this logic to not mix metadata files with data files. - ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") + // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we + // should skip this file in case of double reading. + val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || + pathName.startsWith(".") || pathName.endsWith("._COPYING_") + val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") + exclude && !include } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index b7a472b7f0..2b4c9f3ed3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -142,6 +142,7 @@ class FileIndexSuite extends SharedSQLContext { assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) + assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_")) } test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { |