aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala20
3 files changed, 30 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9a10a23937..2c258b6ee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -227,6 +227,12 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
+ val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
+ defaultValue = Some(true),
+ doc = "When true, the Parquet data source merges schemas collected from all data files, " +
+ "otherwise the schema is picked from the summary file or a random data file " +
+ "if no summary file is available.")
+
val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
defaultValue = Some(false),
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bc39fae2bc..5ac3e9a44e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -114,7 +114,10 @@ private[sql] class ParquetRelation2(
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
- parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
+ parameters
+ .get(ParquetRelation2.MERGE_SCHEMA)
+ .map(_.toBoolean)
+ .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
private val maybeMetastoreSchema = parameters
.get(ParquetRelation2.METASTORE_SCHEMA)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index fafad67dde..a0a81c4309 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.types._
@@ -122,6 +123,25 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
checkAnswer(df2, df.collect().toSeq)
}
}
+
+ test("Enabling/disabling schema merging") {
+ def testSchemaMerging(expectedColumnNumber: Int): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+ sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
+ assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
+ }
+ }
+
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+ testSchemaMerging(3)
+ }
+
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") {
+ testSchemaMerging(2)
+ }
+ }
}
class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {