aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-26 20:48:56 -0700
committerYin Huai <yhuai@databricks.com>2015-05-26 20:48:56 -0700
commitb463e6d618e69c535297e51f41eca4f91bd33cc8 (patch)
tree211ed6394d8ee89e3944d2bb9076f07c5335f802
parent0c33c7b4a66e47f6246f1b7f2b96f2c33126ec63 (diff)
downloadspark-b463e6d618e69c535297e51f41eca4f91bd33cc8.tar.gz
spark-b463e6d618e69c535297e51f41eca4f91bd33cc8.tar.bz2
spark-b463e6d618e69c535297e51f41eca4f91bd33cc8.zip
[SPARK-7868] [SQL] Ignores _temporary directories in HadoopFsRelation
So that potential partial/corrupted data files left by failed tasks/jobs won't affect normal data scan. Author: Cheng Lian <lian@databricks.com> Closes #6411 from liancheng/spark-7868 and squashes the following commits: 273ea36 [Cheng Lian] Ignores _temporary directories
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala16
2 files changed, 29 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index aaabbadcd6..c06026e042 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.sql.{Row, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
/**
* ::DeveloperApi::
@@ -378,16 +378,22 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
def refresh(): Unit = {
+ // We don't filter files/directories whose name start with "_" or "." here, as specific data
+ // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
+ // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
+ // partial/corrupted data files there.
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
- val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
- files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+ if (status.getPath.getName.toLowerCase == "_temporary") {
+ Set.empty
+ } else {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
+ files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+ }
}
leafFiles.clear()
- // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
- // may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
val statuses = paths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
@@ -395,7 +401,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
}
- val (dirs, files) = statuses.partition(_.isDir)
+ val files = statuses.filterNot(_.isDir)
leafFiles ++= files.map(f => f.getPath -> f).toMap
leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7c02d563f8..cf5ae88dc4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
}
}
+
+ test("SPARK-7868: _temporary directories should be ignored") {
+ withTempPath { dir =>
+ val df = Seq("a", "b", "c").zipWithIndex.toDF()
+
+ df.write
+ .format("parquet")
+ .save(dir.getCanonicalPath)
+
+ df.write
+ .format("parquet")
+ .save(s"${dir.getCanonicalPath}/_temporary")
+
+ checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
+ }
+ }
}