diff options
author | Cheng Lian <lian@databricks.com> | 2015-05-26 20:48:56 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-05-26 20:48:56 -0700 |
commit | b463e6d618e69c535297e51f41eca4f91bd33cc8 (patch) | |
tree | 211ed6394d8ee89e3944d2bb9076f07c5335f802 | |
parent | 0c33c7b4a66e47f6246f1b7f2b96f2c33126ec63 (diff) | |
download | spark-b463e6d618e69c535297e51f41eca4f91bd33cc8.tar.gz spark-b463e6d618e69c535297e51f41eca4f91bd33cc8.tar.bz2 spark-b463e6d618e69c535297e51f41eca4f91bd33cc8.zip |
[SPARK-7868] [SQL] Ignores _temporary directories in HadoopFsRelation
So that potential partial/corrupted data files left by failed tasks/jobs won't affect normal data scan.
Author: Cheng Lian <lian@databricks.com>
Closes #6411 from liancheng/spark-7868 and squashes the following commits:
273ea36 [Cheng Lian] Ignores _temporary directories
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala | 20 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 16 |
2 files changed, 29 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index aaabbadcd6..c06026e042 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable import org.apache.spark.sql.{Row, _} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType /** * ::DeveloperApi:: @@ -378,16 +378,22 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] def refresh(): Unit = { + // We don't filter files/directories whose name start with "_" or "." here, as specific data + // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files). + // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave + // partial/corrupted data files there. def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { - val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) - val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus] - files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) + if (status.getPath.getName.toLowerCase == "_temporary") { + Set.empty + } else { + val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) + val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus] + files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) + } } leafFiles.clear() - // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources - // may take advantages over them (e.g. Parquet _metadata and _common_metadata files). val statuses = paths.flatMap { path => val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) @@ -395,7 +401,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) } - val (dirs, files) = statuses.partition(_.isDir) + val files = statuses.filterNot(_.isDir) leafFiles ++= files.map(f => f.getPath -> f).toMap leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 7c02d563f8..cf5ae88dc4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { checkAnswer(table("t"), df.select('b, 'c, 'a).collect()) } } + + test("SPARK-7868: _temporary directories should be ignored") { + withTempPath { dir => + val df = Seq("a", "b", "c").zipWithIndex.toDF() + + df.write + .format("parquet") + .save(dir.getCanonicalPath) + + df.write + .format("parquet") + .save(s"${dir.getCanonicalPath}/_temporary") + + checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect()) + } + } } |