aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-31 11:21:15 -0700
committerMichael Armbrust <michael@databricks.com>2015-03-31 11:21:15 -0700
commit81020144708773ba3af4932288ffa09ef901269e (patch)
tree8eda2f98a4b70147804daa582abfb127b5fee093 /sql
parenta7992ffaf1e8adc9d2c225a986fa3162e8e130eb (diff)
downloadspark-81020144708773ba3af4932288ffa09ef901269e.tar.gz
spark-81020144708773ba3af4932288ffa09ef901269e.tar.bz2
spark-81020144708773ba3af4932288ffa09ef901269e.zip
[SPARK-6575] [SQL] Adds configuration to disable schema merging while converting metastore Parquet tables
Consider a metastore Parquet table that 1. doesn't have schema evolution issue 2. has lots of data files and/or partitions In this case, driver schema merging can be both slow and unnecessary. Would be good to have a configuration to let the use disable schema merging when converting such a metastore Parquet table. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5231) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5231 from liancheng/spark-6575 and squashes the following commits: cd96159 [Cheng Lian] Adds configuration to disable schema merging while converting metastore Parquet tables
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
2 files changed, 15 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c06c2e396b..6bb1c47dba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -58,6 +58,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
/**
+ * When true, also tries to merge possibly different but compatible Parquet schemas in different
+ * Parquet data files.
+ *
+ * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
+ */
+ protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean =
+ getConf("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false") == "true"
+
+ /**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f20f0ad99f..2b5d031741 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -218,6 +218,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+ val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
+ val parquetOptions = Map(
+ ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
@@ -234,18 +238,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
- LogicalRelation(
- ParquetRelation2(
- paths,
- Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json),
- None,
- Some(partitionSpec))(hive))
+ LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
- LogicalRelation(
- ParquetRelation2(
- paths,
- Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive))
+ LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
}
}