aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-03-14 09:03:13 -0700
committerYin Huai <yhuai@databricks.com>2016-03-14 09:03:13 -0700
commit250832c733602bcca034dd1fea6e325c55ccd1cd (patch)
tree5ae8ae14e276e0f1445760185bba2de3d76bd196 /sql
parent31d069d4c2956306355d14087ca74ce1e6705217 (diff)
downloadspark-250832c733602bcca034dd1fea6e325c55ccd1cd.tar.gz
spark-250832c733602bcca034dd1fea6e325c55ccd1cd.tar.bz2
spark-250832c733602bcca034dd1fea6e325c55ccd1cd.zip
[SPARK-13207][SQL] Make partitioning discovery ignore _SUCCESS files.
If a _SUCCESS appears in the inner partitioning dir, partition discovery will treat that _SUCCESS file as a data file. Then, partition discovery will fail because it finds that the dir structure is not valid. We should ignore those `_SUCCESS` files. In future, it is better to ignore all files/dirs starting with `_` or `.`. This PR does not make this change. I am thinking about making this change simple, so we can consider of getting it in branch 1.6. To ignore all files/dirs starting with `_` or `, the main change is to let ParquetRelation have another way to get metadata files. Right now, it relies on FileStatusCache's cachedLeafStatuses, which returns file statuses of both metadata files (e.g. metadata files used by parquet) and data files, which requires more changes. https://issues.apache.org/jira/browse/SPARK-13207 Author: Yin Huai <yhuai@databricks.com> Closes #11088 from yhuai/SPARK-13207.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala23
2 files changed, 44 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index be2d98c46d..601f944fb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -522,7 +522,7 @@ class HDFSFileCatalog(
}
}.filterNot { status =>
val name = status.getPath.getName
- name.toLowerCase == "_temporary" || name.startsWith(".")
+ HadoopFsRelation.shouldFilterOut(name)
}
val (dirs, files) = statuses.partition(_.isDirectory)
@@ -616,6 +616,16 @@ class HDFSFileCatalog(
* Helper methods for gathering metadata from HDFS.
*/
private[sql] object HadoopFsRelation extends Logging {
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // TODO: We should try to filter out all files/dirs starting with "." or "_".
+ // The only reason that we are not doing it now is that Parquet needs to find those
+ // metadata files from leaf files returned by this methods. We should refactor
+ // this logic to not mix metadata files with data files.
+ pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
+ }
+
// We don't filter files/directories whose name start with "_" except "_temporary" here, as
// specific data sources may take advantages over them (e.g. Parquet _metadata and
// _common_metadata files). "_temporary" directories are explicitly ignored since failed
@@ -624,19 +634,21 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
logInfo(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
- if (name == "_temporary" || name.startsWith(".")) {
+ if (shouldFilterOut(name)) {
Array.empty
} else {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(fs.getConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- if (pathFilter != null) {
- val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- } else {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- }
+ val statuses =
+ if (pathFilter != null) {
+ val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ } else {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ }
+ statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index b74b9d3f3b..026191528e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -706,6 +706,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
+ test("_SUCCESS should not break partitioning discovery") {
+ Seq(1, 32).foreach { threshold =>
+ // We have two paths to list files, one at driver side, another one that we use
+ // a Spark job. We need to test both ways.
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> threshold.toString) {
+ withTempPath { dir =>
+ val tablePath = new File(dir, "table")
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1", "_SUCCESS"))
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1", "_SUCCESS"))
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1/d=1", "_SUCCESS"))
+ checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+ }
+ }
+ }
+ }
+
test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>