diff options
author | Cheng Lian <lian@databricks.com> | 2015-02-25 15:15:22 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-25 15:15:22 -0800 |
commit | e0fdd467e277867d6bec5c6605cc1cabce70ac89 (patch) | |
tree | bf39791fff8f005c11d6a121492e104ff946dcf8 | |
parent | f3f4c87b3d944c10d1200dfe49091ebb2a149be6 (diff) | |
download | spark-e0fdd467e277867d6bec5c6605cc1cabce70ac89.tar.gz spark-e0fdd467e277867d6bec5c6605cc1cabce70ac89.tar.bz2 spark-e0fdd467e277867d6bec5c6605cc1cabce70ac89.zip |
[SPARK-6010] [SQL] Merging compatible Parquet schemas before computing splits
`ReadContext.init` calls `InitContext.getMergedKeyValueMetadata`, which doesn't know how to merge conflicting user defined key-value metadata and throws exception. In our case, when dealing with different but compatible schemas, we have different Spark SQL schema JSON strings in different Parquet part-files, thus causes this problem. Reading similar Parquet files generated by Hive doesn't suffer from this issue.
In this PR, we manually merge the schemas before passing it to `ReadContext` to avoid the exception.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4768)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #4768 from liancheng/spark-6010 and squashes the following commits:
9002f0a [Cheng Lian] Fixes SPARK-6010
3 files changed, 45 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 65966458eb..4dc13b036c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -48,6 +48,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** @@ -459,13 +460,30 @@ private[parquet] class FilteringParquetRowInputFormat val getGlobalMetaData = classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]]) getGlobalMetaData.setAccessible(true) - val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] + var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData] if (globalMetaData == null) { val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] return splits } + Option(globalMetaData.getKeyValueMetaData.get(RowReadSupport.SPARK_METADATA_KEY)).foreach { + schemas => + val mergedSchema = schemas + .map(DataType.fromJson(_).asInstanceOf[StructType]) + .reduce(_ merge _) + .json + + val mergedMetadata = globalMetaData + .getKeyValueMetaData + .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(mergedSchema))) + + globalMetaData = new GlobalMetaData( + globalMetaData.getSchema, + mergedMetadata, + globalMetaData.getCreatedBy) + } + val readContext = getReadSupport(configuration).init( new InitContext(configuration, globalMetaData.getKeyValueMetaData, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala index 0fa2fe90f9..d6ea6679c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -131,6 +131,11 @@ private[sql] trait ParquetTest { data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite) } + protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( + df: DataFrame, path: File): Unit = { + df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite) + } + protected def makePartitionDir( basePath: File, defaultPartitionName: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 3bf0116c8f..adb3c9391f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -36,6 +36,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { override val sqlContext: SQLContext = TestSQLContext import sqlContext._ + import sqlContext.implicits._ val defaultPartitionName = "__NULL__" @@ -319,4 +320,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } } } + + test("read partitioned table - merging compatible schemas") { + withTempDir { base => + makeParquetFile( + (1 to 10).map(i => Tuple1(i)).toDF("intField"), + makePartitionDir(base, defaultPartitionName, "pi" -> 1)) + + makeParquetFile( + (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), + makePartitionDir(base, defaultPartitionName, "pi" -> 2)) + + load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2))) + } + } + } } |