aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala11
2 files changed, 24 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 014abd454f..9a08524476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.execution.datasources
+import scala.collection.mutable
+
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
/**
@@ -49,10 +51,16 @@ case class HadoopFsRelation(
override def sqlContext: SQLContext = sparkSession.sqlContext
val schema: StructType = {
- val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
- StructType(dataSchema ++ partitionSchema.filterNot { column =>
- dataSchemaColumnNames.contains(column.name.toLowerCase)
- })
+ val getColName: (StructField => String) =
+ if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
+ val overlappedPartCols = mutable.Map.empty[String, StructField]
+ partitionSchema.foreach { partitionField =>
+ if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
+ overlappedPartCols += getColName(partitionField) -> partitionField
+ }
+ }
+ StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
+ partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
}
def partitionSchemaOption: Option[StructType] =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 22e35a1bc0..f433a74da8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -969,4 +969,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
))
}
}
+
+ test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = Seq((1L, 2.0)).toDF("a", "b")
+ df.write.parquet(s"$path/a=1")
+ checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1, 2.0)))
+ }
+ }
+ }
}