aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWisely Chen <wiselychen@appier.com>2015-07-02 09:58:12 -0700
committerCheng Lian <lian@databricks.com>2015-07-02 09:58:12 -0700
commit246265f2bb056d5e9011d3331b809471a24ff8d7 (patch)
tree73bf1cbe265db29fda5f6e33a4ba074c3be51f90 /sql/core
parent1bbdf9ead9e912f60dccbb23029b7de4948ebee3 (diff)
downloadspark-246265f2bb056d5e9011d3331b809471a24ff8d7.tar.gz
spark-246265f2bb056d5e9011d3331b809471a24ff8d7.tar.bz2
spark-246265f2bb056d5e9011d3331b809471a24ff8d7.zip
[SPARK-8690] [SQL] Add a setting to disable SparkSQL parquet schema merge by using datasource API
The detail problem story is in https://issues.apache.org/jira/browse/SPARK-8690 General speaking, I add a config spark.sql.parquet.mergeSchema to achieve the sqlContext.load("parquet" , Map( "path" -> "..." , "mergeSchema" -> "false" )) It will become a simple flag and without any side affect. Author: Wisely Chen <wiselychen@appier.com> Closes #7070 from thegiive/SPARK8690 and squashes the following commits: c6f3e86 [Wisely Chen] Refactor some code style and merge the test case to ParquetSchemaMergeConfigSuite 94c9307 [Wisely Chen] Remove some style problem db8ef1b [Wisely Chen] Change config to SQLConf and add test case b6806fb [Wisely Chen] remove text c0edb8c [Wisely Chen] [SPARK-8690] add a config spark.sql.parquet.mergeSchema to disable datasource API schema merge feature.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala20
3 files changed, 30 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9a10a23937..2c258b6ee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -227,6 +227,12 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
+ val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
+ defaultValue = Some(true),
+ doc = "When true, the Parquet data source merges schemas collected from all data files, " +
+ "otherwise the schema is picked from the summary file or a random data file " +
+ "if no summary file is available.")
+
val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
defaultValue = Some(false),
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bc39fae2bc..5ac3e9a44e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -114,7 +114,10 @@ private[sql] class ParquetRelation2(
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
- parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
+ parameters
+ .get(ParquetRelation2.MERGE_SCHEMA)
+ .map(_.toBoolean)
+ .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
private val maybeMetastoreSchema = parameters
.get(ParquetRelation2.METASTORE_SCHEMA)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index fafad67dde..a0a81c4309 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.types._
@@ -122,6 +123,25 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
checkAnswer(df2, df.collect().toSeq)
}
}
+
+ test("Enabling/disabling schema merging") {
+ def testSchemaMerging(expectedColumnNumber: Int): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+ sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
+ assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
+ }
+ }
+
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+ testSchemaMerging(3)
+ }
+
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") {
+ testSchemaMerging(2)
+ }
+ }
}
class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {