aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-03 00:59:50 +0800
committerCheng Lian <lian@databricks.com>2015-06-03 00:59:50 +0800
commit1bb5d716c0351cd0b4c11b397fd778f30db39bd9 (patch)
tree5a3bec7b033d96e0a6e3af4f67d76a9d2280f81a /sql
parentbd97840d5ccc3f0bfde1e5cfc7abeac9681997ab (diff)
downloadspark-1bb5d716c0351cd0b4c11b397fd778f30db39bd9.tar.gz
spark-1bb5d716c0351cd0b4c11b397fd778f30db39bd9.tar.bz2
spark-1bb5d716c0351cd0b4c11b397fd778f30db39bd9.zip
[SPARK-8037] [SQL] Ignores files whose name starts with dot in HadoopFsRelation
Author: Cheng Lian <lian@databricks.com> Closes #6581 from liancheng/spark-8037 and squashes the following commits: d08e97b [Cheng Lian] Ignores files whose name starts with dot in HadoopFsRelation
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala19
3 files changed, 26 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index dafdf0f8b4..c4c99de5a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -187,7 +187,7 @@ private[sql] object PartitioningUtils {
Seq.empty
} else {
assert(distinctPartitionsColNames.size == 1, {
- val list = distinctPartitionsColNames.mkString("\t", "\n", "")
+ val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
s"Conflicting partition column names detected:\n$list"
})
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index b1b997c030..c4ffa8de52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -379,10 +379,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
def refresh(): Unit = {
- // We don't filter files/directories whose name start with "_" or "." here, as specific data
- // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
- // But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
- // partial/corrupted data files there.
+ // We don't filter files/directories whose name start with "_" except "_temporary" here, as
+ // specific data sources may take advantages over them (e.g. Parquet _metadata and
+ // _common_metadata files). "_temporary" directories are explicitly ignored since failed
+ // tasks/jobs may leave partial/corrupted data files there.
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
if (status.getPath.getName.toLowerCase == "_temporary") {
Set.empty
@@ -400,6 +400,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
+ }.filterNot { status =>
+ // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories
+ status.getPath.getName.startsWith(".")
}
val files = statuses.filterNot(_.isDir)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index f231589e96..3b29979452 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -18,10 +18,11 @@ package org.apache.spark.sql.parquet
import java.io.File
import java.math.BigInteger
-import java.sql.{Timestamp, Date}
+import java.sql.Timestamp
import scala.collection.mutable.ArrayBuffer
+import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.Literal
@@ -432,4 +433,20 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
checkAnswer(read.load(dir.toString).select(fields: _*), row)
}
}
+
+ test("SPARK-8037: Ignores files whose name starts with dot") {
+ withTempPath { dir =>
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(dir.getCanonicalPath)
+
+ Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store"))
+ Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+ checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df)
+ }
+ }
}