diff options
author | Wisely Chen <wiselychen@appier.com> | 2015-07-02 09:58:12 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-07-02 09:58:12 -0700 |
commit | 246265f2bb056d5e9011d3331b809471a24ff8d7 (patch) | |
tree | 73bf1cbe265db29fda5f6e33a4ba074c3be51f90 /sql/core | |
parent | 1bbdf9ead9e912f60dccbb23029b7de4948ebee3 (diff) | |
download | spark-246265f2bb056d5e9011d3331b809471a24ff8d7.tar.gz spark-246265f2bb056d5e9011d3331b809471a24ff8d7.tar.bz2 spark-246265f2bb056d5e9011d3331b809471a24ff8d7.zip |
[SPARK-8690] [SQL] Add a setting to disable SparkSQL parquet schema merge by using datasource API
The detail problem story is in https://issues.apache.org/jira/browse/SPARK-8690
General speaking, I add a config spark.sql.parquet.mergeSchema to achieve the sqlContext.load("parquet" , Map( "path" -> "..." , "mergeSchema" -> "false" ))
It will become a simple flag and without any side affect.
Author: Wisely Chen <wiselychen@appier.com>
Closes #7070 from thegiive/SPARK8690 and squashes the following commits:
c6f3e86 [Wisely Chen] Refactor some code style and merge the test case to ParquetSchemaMergeConfigSuite
94c9307 [Wisely Chen] Remove some style problem
db8ef1b [Wisely Chen] Change config to SQLConf and add test case
b6806fb [Wisely Chen] remove text
c0edb8c [Wisely Chen] [SPARK-8690] add a config spark.sql.parquet.mergeSchema to disable datasource API schema merge feature.
Diffstat (limited to 'sql/core')
3 files changed, 30 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9a10a23937..2c258b6ee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -227,6 +227,12 @@ private[spark] object SQLConf { defaultValue = Some(true), doc = "<TODO>") + val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema", + defaultValue = Some(true), + doc = "When true, the Parquet data source merges schemas collected from all data files, " + + "otherwise the schema is picked from the summary file or a random data file " + + "if no summary file is available.") + val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString", defaultValue = Some(false), doc = "Some other Parquet-producing systems, in particular Impala and older versions of " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bc39fae2bc..5ac3e9a44e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -114,7 +114,10 @@ private[sql] class ParquetRelation2( // Should we merge schemas from all Parquet part-files? private val shouldMergeSchemas = - parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean + parameters + .get(ParquetRelation2.MERGE_SCHEMA) + .map(_.toBoolean) + .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) private val maybeMetastoreSchema = parameters .get(ParquetRelation2.METASTORE_SCHEMA) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index fafad67dde..a0a81c4309 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.parquet +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.types._ @@ -122,6 +123,25 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { checkAnswer(df2, df.collect().toSeq) } } + + test("Enabling/disabling schema merging") { + def testSchemaMerging(expectedColumnNumber: Int): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) + assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) + } + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { + testSchemaMerging(3) + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") { + testSchemaMerging(2) + } + } } class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll { |