From 097a7e36e0bf7290b1879331375bacc905583bd3 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 29 Aug 2015 16:39:40 -0700 Subject: [SPARK-10339] [SPARK-10334] [SPARK-10301] [SQL] Partitioned table scan can OOM driver and throw a better error message when users need to enable parquet schema merging This fixes the problem that scanning partitioned table causes driver have a high memory pressure and takes down the cluster. Also, with this fix, we will be able to correctly show the query plan of a query consuming partitioned tables. https://issues.apache.org/jira/browse/SPARK-10339 https://issues.apache.org/jira/browse/SPARK-10334 Finally, this PR squeeze in a "quick fix" for SPARK-10301. It is not a real fix, but it just throw a better error message to let user know what to do. Author: Yin Huai Closes #8515 from yhuai/partitionedTableScan. --- .../execution/datasources/DataSourceStrategy.scala | 85 +++++++++++----------- .../datasources/parquet/CatalystRowConverter.scala | 7 ++ .../sql/sources/ParquetHadoopFsRelationSuite.scala | 15 +++- 3 files changed, 65 insertions(+), 42 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6c1ef6a6df..c58213155d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _} @@ -121,7 +122,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { projections: Seq[NamedExpression], filters: Seq[Expression], partitionColumns: StructType, - partitions: Array[Partition]) = { + partitions: Array[Partition]): SparkPlan = { val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] // Because we are creating one RDD per partition, we need to have a shared HadoopConf. @@ -130,49 +131,51 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val confBroadcast = relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) - // Builds RDD[Row]s for each selected partition. - val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => - // The table scan operator (PhysicalRDD) which retrieves required columns from data files. - // Notice that the schema of data files, represented by `relation.dataSchema`, may contain - // some partition column(s). - val scan = - pruneFilterProject( - logicalRelation, - projections, - filters, - (columns: Seq[Attribute], filters) => { - val partitionColNames = partitionColumns.fieldNames - - // Don't scan any partition columns to save I/O. Here we are being optimistic and - // assuming partition columns data stored in data files are always consistent with those - // partition values encoded in partition directory paths. - val needed = columns.filterNot(a => partitionColNames.contains(a.name)) - val dataRows = - relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast) - - // Merges data values with partition values. - mergeWithPartitionValues( - relation.schema, - columns.map(_.name).toArray, - partitionColNames, - partitionValues, - toCatalystRDD(logicalRelation, needed, dataRows)) - }) - - scan.execute() - } + // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder + // will union all partitions and attach partition values if needed. + val scanBuilder = { + (columns: Seq[Attribute], filters: Array[Filter]) => { + // Builds RDD[Row]s for each selected partition. + val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => + val partitionColNames = partitionColumns.fieldNames + + // Don't scan any partition columns to save I/O. Here we are being optimistic and + // assuming partition columns data stored in data files are always consistent with those + // partition values encoded in partition directory paths. + val needed = columns.filterNot(a => partitionColNames.contains(a.name)) + val dataRows = + relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast) + + // Merges data values with partition values. + mergeWithPartitionValues( + relation.schema, + columns.map(_.name).toArray, + partitionColNames, + partitionValues, + toCatalystRDD(logicalRelation, needed, dataRows)) + } + + val unionedRows = + if (perPartitionRows.length == 0) { + relation.sqlContext.emptyResult + } else { + new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) + } - val unionedRows = - if (perPartitionRows.length == 0) { - relation.sqlContext.emptyResult - } else { - new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) + unionedRows } + } + + // Create the scan operator. If needed, add Filter and/or Project on top of the scan. + // The added Filter/Project is on top of the unioned RDD. We do not want to create + // one Filter/Project for every partition. + val sparkPlan = pruneFilterProject( + logicalRelation, + projections, + filters, + scanBuilder) - execution.PhysicalRDD.createFromDataSource( - projections.map(_.toAttribute), - unionedRows, - logicalRelation.relation) + sparkPlan } // TODO: refactor this thing. It is very complicated because it does projection internally. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index f682ca0d8f..fe13dfbbed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -196,6 +196,13 @@ private[parquet] class CatalystRowConverter( } } + if (paddedParquetFields.length != catalystType.length) { + throw new UnsupportedOperationException( + "A Parquet file's schema has different number of fields with the table schema. " + + "Please enable schema merging by setting \"mergeSchema\" to true when load " + + "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.") + } + paddedParquetFields.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index cb4cedddbf..06dadbb5fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -23,7 +23,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.{execution, AnalysisException, SaveMode} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -136,4 +136,17 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { assert(fs.exists(commonSummaryPath)) } } + + test("SPARK-10334 Projections and filters should be kept in physical plan") { + withTempPath { dir => + val path = dir.getCanonicalPath + + sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path) + val df = sqlContext.read.parquet(path).filter('a === 0).select('b) + val physicalPlan = df.queryExecution.executedPlan + + assert(physicalPlan.collect { case p: execution.Project => p }.length === 1) + assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1) + } + } } -- cgit v1.2.3