aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-12-16 22:44:42 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-16 22:44:42 +0800
commitdc2a4d4ad478fdb0486cc0515d4fe8b402d24db4 (patch)
tree5d99d1e7bcefa1a6e208de500b40f7d4cfeec62c /sql
parent53ab8fb34682203d2138567c0a7deff34e0da9c5 (diff)
downloadspark-dc2a4d4ad478fdb0486cc0515d4fe8b402d24db4.tar.gz
spark-dc2a4d4ad478fdb0486cc0515d4fe8b402d24db4.tar.bz2
spark-dc2a4d4ad478fdb0486cc0515d4fe8b402d24db4.zip
[SPARK-18108][SQL] Fix a schema inconsistent bug that makes a parquet reader fail to read data
## What changes were proposed in this pull request? A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows; ``` scala> case class A(a: Long, b: Int) scala> val as = Seq(A(1, 2)) scala> spark.createDataFrame(as).write.parquet("/data/a=1/") scala> val df = spark.read.parquet("/data/") scala> df.printSchema root |-- a: long (nullable = true) |-- b: integer (nullable = true) scala> df.collect java.lang.NullPointerException at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283) at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) ``` The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema. ## How was this patch tested? Add tests in `ParquetPartitionDiscoverySuite` Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16030 from maropu/SPARK-18108.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala11
2 files changed, 24 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 014abd454f..9a08524476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.execution.datasources
+import scala.collection.mutable
+
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
/**
@@ -49,10 +51,16 @@ case class HadoopFsRelation(
override def sqlContext: SQLContext = sparkSession.sqlContext
val schema: StructType = {
- val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
- StructType(dataSchema ++ partitionSchema.filterNot { column =>
- dataSchemaColumnNames.contains(column.name.toLowerCase)
- })
+ val getColName: (StructField => String) =
+ if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
+ val overlappedPartCols = mutable.Map.empty[String, StructField]
+ partitionSchema.foreach { partitionField =>
+ if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
+ overlappedPartCols += getColName(partitionField) -> partitionField
+ }
+ }
+ StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
+ partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
}
def partitionSchemaOption: Option[StructType] =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 22e35a1bc0..f433a74da8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -969,4 +969,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
))
}
}
+
+ test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = Seq((1L, 2.0)).toDF("a", "b")
+ df.write.parquet(s"$path/a=1")
+ checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1, 2.0)))
+ }
+ }
+ }
}