aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
2 files changed, 15 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c06c2e396b..6bb1c47dba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -58,6 +58,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
/**
+ * When true, also tries to merge possibly different but compatible Parquet schemas in different
+ * Parquet data files.
+ *
+ * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
+ */
+ protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean =
+ getConf("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false") == "true"
+
+ /**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f20f0ad99f..2b5d031741 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -218,6 +218,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+ val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
+ val parquetOptions = Map(
+ ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
@@ -234,18 +238,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
- LogicalRelation(
- ParquetRelation2(
- paths,
- Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json),
- None,
- Some(partitionSpec))(hive))
+ LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
- LogicalRelation(
- ParquetRelation2(
- paths,
- Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive))
+ LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
}
}