aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2016-12-28 10:42:47 +0000
committerSean Owen <sowen@cloudera.com>2016-12-28 10:42:47 +0000
commit76e9bd74885a99462ed0957aad37cbead7f14de2 (patch)
tree6ca74db8ff1454df05c0709bf749d6fb907d80f3 /sql
parent67fb33e7e078eef3ecd5dcbfc26659b6fe2d054e (diff)
downloadspark-76e9bd74885a99462ed0957aad37cbead7f14de2.tar.gz
spark-76e9bd74885a99462ed0957aad37cbead7f14de2.tar.bz2
spark-76e9bd74885a99462ed0957aad37cbead7f14de2.zip
[SPARK-18960][SQL][SS] Avoid double reading file which is being copied.
## What changes were proposed in this pull request? In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file for a period of time. The duration depends on file size. If we do not skip this file, we will may read the same data for two times. ## How was this patch tested? update unit test Author: uncleGen <hustyugm@gmail.com> Closes #16370 from uncleGen/SPARK-18960.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala1
2 files changed, 9 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 825a0f70dd..82c1599a39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -439,10 +439,15 @@ object PartitioningAwareFileIndex extends Logging {
/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
- // We filter everything that starts with _ and ., except _common_metadata and _metadata
+ // We filter follow paths:
+ // 1. everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
- ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
- !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+ // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
+ // should skip this file in case of double reading.
+ val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
+ pathName.startsWith(".") || pathName.endsWith("._COPYING_")
+ val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
+ exclude && !include
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index b7a472b7f0..2b4c9f3ed3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -142,6 +142,7 @@ class FileIndexSuite extends SharedSQLContext {
assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_"))
}
test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {