aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-07-30 17:45:30 +0800
committerCheng Lian <lian@databricks.com>2015-07-30 17:45:30 +0800
commit6175d6cfe795fbd88e3ee713fac375038a3993a8 (patch)
tree8b3a54239608b5910e36bad44181860453b1b16f /sql/core
parent5ba2d44068b89fd8e81cfd24f49bf20d373f81b9 (diff)
downloadspark-6175d6cfe795fbd88e3ee713fac375038a3993a8.tar.gz
spark-6175d6cfe795fbd88e3ee713fac375038a3993a8.tar.bz2
spark-6175d6cfe795fbd88e3ee713fac375038a3993a8.zip
[SPARK-8838] [SQL] Add config to enable/disable merging part-files when merging parquet schema
JIRA: https://issues.apache.org/jira/browse/SPARK-8838 Currently all part-files are merged when merging parquet schema. However, in case there are many part-files and we can make sure that all the part-files have the same schema as their summary file. If so, we provide a configuration to disable merging part-files when merging parquet schema. In short, we need to merge parquet schema because different summary files may contain different schema. But the part-files are confirmed to have the same schema with summary files. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #7238 from viirya/option_partfile_merge and squashes the following commits: 71d5b5f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 8816f44 [Liang-Chi Hsieh] For comments. dbc8e6b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge afc2fa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge d4ed7e6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge df43027 [Liang-Chi Hsieh] Get dataStatuses' partitions based on all paths. 4eb2f00 [Liang-Chi Hsieh] Use given parameter. ea8f6e5 [Liang-Chi Hsieh] Correct the code comments. a57be0e [Liang-Chi Hsieh] Merge part-files if there are no summary files. 47df981 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 4caf293 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 0e734e0 [Liang-Chi Hsieh] Use correct API. 3b6be5b [Liang-Chi Hsieh] Fix key not found. 4bdd7e0 [Liang-Chi Hsieh] Don't read footer files if we can skip them. 8bbebcb [Liang-Chi Hsieh] Figure out how to test the config. bbd4ce7 [Liang-Chi Hsieh] Add config to enable/disable merging part-files when merging parquet schema.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala27
3 files changed, 52 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index cdb0c7a1c0..2564bbd207 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -247,6 +247,13 @@ private[spark] object SQLConf {
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
+ val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
+ defaultValue = Some(false),
+ doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
+ "summary files and we will ignore them when merging schema. Otherwise, if this is " +
+ "false, which is the default, we will merge all part-files. This should be considered " +
+ "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
+
val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
defaultValue = Some(false),
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 1a8176d8a8..b4337a48db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -124,6 +124,9 @@ private[sql] class ParquetRelation(
.map(_.toBoolean)
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+ private val mergeRespectSummaries =
+ sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+
private val maybeMetastoreSchema = parameters
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
@@ -421,7 +424,21 @@ private[sql] class ParquetRelation(
val filesToTouch =
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
- (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+
+ // If mergeRespectSummaries config is true, we assume that all part-files are the same for
+ // their schema with summary files, so we ignore them when merging schema.
+ // If the config is disabled, which is the default setting, we merge all part-files.
+ // In this mode, we only need to merge schemas contained in all those summary files.
+ // You should enable this configuration only if you are very sure that for the parquet
+ // part-files to read there are corresponding summary files containing correct schema.
+
+ val needMerged: Seq[FileStatus] =
+ if (mergeRespectSummaries) {
+ Seq()
+ } else {
+ dataStatuses
+ }
+ (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index c037faf4cf..a95f70f2bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.parquet
+import java.io.File
+
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.util.Utils
/**
* A test suite that tests various Parquet queries.
@@ -123,6 +126,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
}
}
+ test("Enabling/disabling merging partfiles when merging parquet schema") {
+ def testSchemaMerging(expectedColumnNumber: Int): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+ sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
+ // delete summary files, so if we don't merge part-files, one column will not be included.
+ Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
+ Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
+ assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
+ }
+ }
+
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
+ testSchemaMerging(2)
+ }
+
+ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+ SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
+ testSchemaMerging(3)
+ }
+ }
+
test("Enabling/disabling schema merging") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>