From 1bb5d716c0351cd0b4c11b397fd778f30db39bd9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 3 Jun 2015 00:59:50 +0800 Subject: [SPARK-8037] [SQL] Ignores files whose name starts with dot in HadoopFsRelation Author: Cheng Lian Closes #6581 from liancheng/spark-8037 and squashes the following commits: d08e97b [Cheng Lian] Ignores files whose name starts with dot in HadoopFsRelation --- .../apache/spark/sql/sources/PartitioningUtils.scala | 2 +- .../org/apache/spark/sql/sources/interfaces.scala | 11 +++++++---- .../sql/parquet/ParquetPartitionDiscoverySuite.scala | 19 ++++++++++++++++++- 3 files changed, 26 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index dafdf0f8b4..c4c99de5a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -187,7 +187,7 @@ private[sql] object PartitioningUtils { Seq.empty } else { assert(distinctPartitionsColNames.size == 1, { - val list = distinctPartitionsColNames.mkString("\t", "\n", "") + val list = distinctPartitionsColNames.mkString("\t", "\n\t", "") s"Conflicting partition column names detected:\n$list" }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index b1b997c030..c4ffa8de52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -379,10 +379,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] def refresh(): Unit = { - // We don't filter files/directories whose name start with "_" or "." here, as specific data - // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files). - // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave - // partial/corrupted data files there. + // We don't filter files/directories whose name start with "_" except "_temporary" here, as + // specific data sources may take advantages over them (e.g. Parquet _metadata and + // _common_metadata files). "_temporary" directories are explicitly ignored since failed + // tasks/jobs may leave partial/corrupted data files there. def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { if (status.getPath.getName.toLowerCase == "_temporary") { Set.empty @@ -400,6 +400,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) + }.filterNot { status => + // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories + status.getPath.getName.startsWith(".") } val files = statuses.filterNot(_.isDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index f231589e96..3b29979452 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.parquet import java.io.File import java.math.BigInteger -import java.sql.{Timestamp, Date} +import java.sql.Timestamp import scala.collection.mutable.ArrayBuffer +import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.expressions.Literal @@ -432,4 +433,20 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { checkAnswer(read.load(dir.toString).select(fields: _*), row) } } + + test("SPARK-8037: Ignores files whose name starts with dot") { + withTempPath { dir => + val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d") + + df.write + .format("parquet") + .partitionBy("b", "c", "d") + .save(dir.getCanonicalPath) + + Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store")) + Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) + + checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df) + } + } } -- cgit v1.2.3