aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-25 15:15:22 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-25 15:15:22 -0800
commite0fdd467e277867d6bec5c6605cc1cabce70ac89 (patch)
treebf39791fff8f005c11d6a121492e104ff946dcf8
parentf3f4c87b3d944c10d1200dfe49091ebb2a149be6 (diff)
downloadspark-e0fdd467e277867d6bec5c6605cc1cabce70ac89.tar.gz
spark-e0fdd467e277867d6bec5c6605cc1cabce70ac89.tar.bz2
spark-e0fdd467e277867d6bec5c6605cc1cabce70ac89.zip
[SPARK-6010] [SQL] Merging compatible Parquet schemas before computing splits
`ReadContext.init` calls `InitContext.getMergedKeyValueMetadata`, which doesn't know how to merge conflicting user defined key-value metadata and throws exception. In our case, when dealing with different but compatible schemas, we have different Spark SQL schema JSON strings in different Parquet part-files, thus causes this problem. Reading similar Parquet files generated by Hive doesn't suffer from this issue. In this PR, we manually merge the schemas before passing it to `ReadContext` to avoid the exception. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4768) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4768 from liancheng/spark-6010 and squashes the following commits: 9002f0a [Cheng Lian] Fixes SPARK-6010
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala21
3 files changed, 45 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 65966458eb..4dc13b036c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -48,6 +48,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
+import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
@@ -459,13 +460,30 @@ private[parquet] class FilteringParquetRowInputFormat
val getGlobalMetaData =
classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
getGlobalMetaData.setAccessible(true)
- val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
+ var globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
if (globalMetaData == null) {
val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
return splits
}
+ Option(globalMetaData.getKeyValueMetaData.get(RowReadSupport.SPARK_METADATA_KEY)).foreach {
+ schemas =>
+ val mergedSchema = schemas
+ .map(DataType.fromJson(_).asInstanceOf[StructType])
+ .reduce(_ merge _)
+ .json
+
+ val mergedMetadata = globalMetaData
+ .getKeyValueMetaData
+ .updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(mergedSchema)))
+
+ globalMetaData = new GlobalMetaData(
+ globalMetaData.getSchema,
+ mergedMetadata,
+ globalMetaData.getCreatedBy)
+ }
+
val readContext = getReadSupport(configuration).init(
new InitContext(configuration,
globalMetaData.getKeyValueMetaData,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 0fa2fe90f9..d6ea6679c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -131,6 +131,11 @@ private[sql] trait ParquetTest {
data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
}
+ protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
+ df: DataFrame, path: File): Unit = {
+ df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+ }
+
protected def makePartitionDir(
basePath: File,
defaultPartitionName: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 3bf0116c8f..adb3c9391f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -36,6 +36,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestSQLContext
import sqlContext._
+ import sqlContext.implicits._
val defaultPartitionName = "__NULL__"
@@ -319,4 +320,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}
}
}
+
+ test("read partitioned table - merging compatible schemas") {
+ withTempDir { base =>
+ makeParquetFile(
+ (1 to 10).map(i => Tuple1(i)).toDF("intField"),
+ makePartitionDir(base, defaultPartitionName, "pi" -> 1))
+
+ makeParquetFile(
+ (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
+ makePartitionDir(base, defaultPartitionName, "pi" -> 2))
+
+ load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t")
+
+ withTempTable("t") {
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2)))
+ }
+ }
+ }
}