From 81020144708773ba3af4932288ffa09ef901269e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 31 Mar 2015 11:21:15 -0700 Subject: [SPARK-6575] [SQL] Adds configuration to disable schema merging while converting metastore Parquet tables Consider a metastore Parquet table that 1. doesn't have schema evolution issue 2. has lots of data files and/or partitions In this case, driver schema merging can be both slow and unnecessary. Would be good to have a configuration to let the use disable schema merging when converting such a metastore Parquet table. [Review on Reviewable](https://reviewable.io/reviews/apache/spark/5231) Author: Cheng Lian Closes #5231 from liancheng/spark-6575 and squashes the following commits: cd96159 [Cheng Lian] Adds configuration to disable schema merging while converting metastore Parquet tables --- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 9 +++++++++ .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 16 ++++++---------- 2 files changed, 15 insertions(+), 10 deletions(-) (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c06c2e396b..6bb1c47dba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -57,6 +57,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertMetastoreParquet: Boolean = getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" + /** + * When true, also tries to merge possibly different but compatible Parquet schemas in different + * Parquet data files. + * + * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true. + */ + protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean = + getConf("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false") == "true" + /** * When true, a table created by a Hive CTAS statement (no USING clause) will be * converted to a data source table, using the data source set by spark.sql.sources.default. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f20f0ad99f..2b5d031741 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -218,6 +218,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging + val parquetOptions = Map( + ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, + ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to // serialize the Metastore schema to JSON and pass it as a data source option because of the @@ -234,18 +238,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } val partitionSpec = PartitionSpec(partitionSchema, partitions) val paths = partitions.map(_.path) - LogicalRelation( - ParquetRelation2( - paths, - Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), - None, - Some(partitionSpec))(hive)) + LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - LogicalRelation( - ParquetRelation2( - paths, - Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive)) + LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive)) } } -- cgit v1.2.3