aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-08-12 14:40:12 +0800
committerCheng Lian <lian@databricks.com>2016-08-12 14:40:12 +0800
commitabff92bfdc7d4c9d2308794f0350561fe0ceb4dd (patch)
tree6ec663f2fe5a1c7315e3cc8e291d9b8e1c83da35 /sql/core
parentccc6dc0f4b62837c73fca0e3c8b9c14be798b062 (diff)
downloadspark-abff92bfdc7d4c9d2308794f0350561fe0ceb4dd.tar.gz
spark-abff92bfdc7d4c9d2308794f0350561fe0ceb4dd.tar.bz2
spark-abff92bfdc7d4c9d2308794f0350561fe0ceb4dd.zip
[SPARK-16975][SQL] Column-partition path starting '_' should be handled correctly
## What changes were proposed in this pull request? Currently, Spark ignores path names starting with underscore `_` and `.`. This causes read-failures for the column-partitioned file data sources whose partition column names starts from '_', e.g. `_col`. **Before** ```scala scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet") scala> spark.read.parquet("/tmp/parquet") org.apache.spark.sql.AnalysisException: Unable to infer schema for ParquetFormat at /tmp/parquet20. It must be specified manually; ``` **After** ```scala scala> spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet") scala> spark.read.parquet("/tmp/parquet") res2: org.apache.spark.sql.DataFrame = [id: bigint, _locality_code: int] ``` ## How was this patch tested? Pass the Jenkins with a new test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #14585 from dongjoon-hyun/SPARK-16975-PARQUET.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala9
5 files changed, 14 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 811e96c99a..cef9d4d9c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -204,6 +204,6 @@ abstract class PartitioningAwareFileCatalog(
private def isDataPath(path: Path): Boolean = {
val name = path.getName
- !(name.startsWith("_") || name.startsWith("."))
+ !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index f068779b3e..e03a2323c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -364,7 +364,7 @@ object HadoopFsRelation extends Logging {
// We filter everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
- (pathName.startsWith("_") || pathName.startsWith(".")) &&
+ ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
!pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 19681be604..27910e2cdd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -54,7 +54,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val jsonFiles = files.filterNot { status =>
val name = status.getPath.getName
- name.startsWith("_") || name.startsWith(".")
+ (name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
}.toArray
val jsonSchema = InferSchema.infer(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 7794f31331..9c4778acf5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -236,7 +236,8 @@ class ParquetFileFormat
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = allFiles.filter { f =>
isSummaryFile(f.getPath) ||
- !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+ !((f.getPath.getName.startsWith("_") && !f.getPath.getName.contains("=")) ||
+ f.getPath.getName.startsWith("."))
}.toArray.sortBy(_.getPath.toString)
FileTypes(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index eac588fff2..4fcde58833 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import java.io.File
import java.math.MathContext
import java.sql.{Date, Timestamp}
@@ -2637,6 +2638,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
+ withTempDir { dir =>
+ val parquetDir = new File(dir, "parquet").getCanonicalPath
+ spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(parquetDir)
+ spark.read.parquet(parquetDir)
+ }
+ }
+
test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") {
withTable("tbl") {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet")