aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-06-14 12:13:12 -0700
committerYin Huai <yhuai@databricks.com>2016-06-14 12:13:12 -0700
commitbd39ffe35c6f939debe5d3c5eb4970b4e62507b0 (patch)
tree074e4058ca27588db489901c3c7720d79530541a
parentdf4ea6614d709ee66f1ceb966df6216b125b8ea1 (diff)
downloadspark-bd39ffe35c6f939debe5d3c5eb4970b4e62507b0.tar.gz
spark-bd39ffe35c6f939debe5d3c5eb4970b4e62507b0.tar.bz2
spark-bd39ffe35c6f939debe5d3c5eb4970b4e62507b0.zip
[SPARK-15895][SQL] Filters out metadata files while doing partition discovery
## What changes were proposed in this pull request? Take the following directory layout as an example: ``` dir/ +- p0=0/ |-_metadata +- p1=0/ |-part-00001.parquet |-part-00002.parquet |-... ``` The `_metadata` file under `p0=0` shouldn't fail partition discovery. This PR filters output all metadata files whose names start with `_` while doing partition discovery. ## How was this patch tested? New unit test added in `ParquetPartitionDiscoverySuite`. Author: Cheng Lian <lian@databricks.com> Closes #13623 from liancheng/spark-15895-partition-disco-no-metafiles.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala44
3 files changed, 60 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 7d2854aaad..d96cf1bf07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
import scala.util.Try
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.SparkSession
@@ -83,8 +83,9 @@ class ListingFileCatalog(
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
- Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).
- getOrElse(Array.empty)
+ Try {
+ HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
+ }.getOrElse(Array.empty[FileStatus])
}
mutable.LinkedHashSet(statuses: _*)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 406d2e8e81..811e96c99a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -50,14 +50,14 @@ abstract class PartitioningAwareFileCatalog(
override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
- Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
+ Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
- existingDir.filterNot(_.getPath.getName.startsWith("_"))
+ existingDir.filter(f => isDataPath(f.getPath))
case None =>
// Directory does not exist, or has no children files
@@ -96,7 +96,11 @@ abstract class PartitioningAwareFileCatalog(
protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
- val leafDirs = leafDirToChildrenFiles.keys.toSeq
+ val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
+ // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
+ // counted as data files, so that they shouldn't participate partition discovery.
+ files.exists(f => isDataPath(f.getPath))
+ }.keys.toSeq
partitionSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
@@ -197,4 +201,9 @@ abstract class PartitioningAwareFileCatalog(
if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
}
}
+
+ private def isDataPath(path: Path): Boolean = {
+ val name = path.getName
+ !(name.startsWith("_") || name.startsWith("."))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index e19345529e..133ffedf12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -25,11 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -890,4 +892,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}
+
+ test("SPARK-15895 summary files in non-leaf partition directories") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+ spark.range(3).write.parquet(s"$path/p0=0/p1=0")
+ }
+
+ val p0 = new File(path, "p0=0")
+ val p1 = new File(p0, "p1=0")
+
+ // Builds the following directory layout by:
+ //
+ // 1. copying Parquet summary files we just wrote into `p0=0`, and
+ // 2. touching a dot-file `.dummy` under `p0=0`.
+ //
+ // <base>
+ // +- p0=0
+ // |- _metadata
+ // |- _common_metadata
+ // |- .dummy
+ // +- p1=0
+ // |- _metadata
+ // |- _common_metadata
+ // |- part-00000.parquet
+ // |- part-00001.parquet
+ // +- ...
+ //
+ // The summary files and the dot-file under `p0=0` should not fail partition discovery.
+
+ Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata"))
+ Files.copy(new File(p1, "_common_metadata"), new File(p0, "_common_metadata"))
+ Files.touch(new File(p0, ".dummy"))
+
+ checkAnswer(spark.read.parquet(s"$path"), Seq(
+ Row(0, 0, 0),
+ Row(1, 0, 0),
+ Row(2, 0, 0)
+ ))
+ }
+ }
}