aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-08-29 16:39:40 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-29 16:39:58 -0700
commit3a61e103b8c480596fe38e558d7e4449ec0dc391 (patch)
treeae4be6e2734cd88f8fb43d5f37976b3a9fb4fa05
parentd178e1e77f6d19ae9dafc7b0e26ae5784b288e42 (diff)
downloadspark-3a61e103b8c480596fe38e558d7e4449ec0dc391.tar.gz
spark-3a61e103b8c480596fe38e558d7e4449ec0dc391.tar.bz2
spark-3a61e103b8c480596fe38e558d7e4449ec0dc391.zip
[SPARK-10339] [SPARK-10334] [SPARK-10301] [SQL] Partitioned table scan can OOM driver and throw a better error message when users need to enable parquet schema merging
This fixes the problem that scanning partitioned table causes driver have a high memory pressure and takes down the cluster. Also, with this fix, we will be able to correctly show the query plan of a query consuming partitioned tables. https://issues.apache.org/jira/browse/SPARK-10339 https://issues.apache.org/jira/browse/SPARK-10334 Finally, this PR squeeze in a "quick fix" for SPARK-10301. It is not a real fix, but it just throw a better error message to let user know what to do. Author: Yin Huai <yhuai@databricks.com> Closes #8515 from yhuai/partitionedTableScan. (cherry picked from commit 097a7e36e0bf7290b1879331375bacc905583bd3) Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala85
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala15
3 files changed, 65 insertions, 42 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6c1ef6a6df..c58213155d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
@@ -121,7 +122,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
projections: Seq[NamedExpression],
filters: Seq[Expression],
partitionColumns: StructType,
- partitions: Array[Partition]) = {
+ partitions: Array[Partition]): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
// Because we are creating one RDD per partition, we need to have a shared HadoopConf.
@@ -130,49 +131,51 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
- // Builds RDD[Row]s for each selected partition.
- val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
- // The table scan operator (PhysicalRDD) which retrieves required columns from data files.
- // Notice that the schema of data files, represented by `relation.dataSchema`, may contain
- // some partition column(s).
- val scan =
- pruneFilterProject(
- logicalRelation,
- projections,
- filters,
- (columns: Seq[Attribute], filters) => {
- val partitionColNames = partitionColumns.fieldNames
-
- // Don't scan any partition columns to save I/O. Here we are being optimistic and
- // assuming partition columns data stored in data files are always consistent with those
- // partition values encoded in partition directory paths.
- val needed = columns.filterNot(a => partitionColNames.contains(a.name))
- val dataRows =
- relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
-
- // Merges data values with partition values.
- mergeWithPartitionValues(
- relation.schema,
- columns.map(_.name).toArray,
- partitionColNames,
- partitionValues,
- toCatalystRDD(logicalRelation, needed, dataRows))
- })
-
- scan.execute()
- }
+ // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
+ // will union all partitions and attach partition values if needed.
+ val scanBuilder = {
+ (columns: Seq[Attribute], filters: Array[Filter]) => {
+ // Builds RDD[Row]s for each selected partition.
+ val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
+ val partitionColNames = partitionColumns.fieldNames
+
+ // Don't scan any partition columns to save I/O. Here we are being optimistic and
+ // assuming partition columns data stored in data files are always consistent with those
+ // partition values encoded in partition directory paths.
+ val needed = columns.filterNot(a => partitionColNames.contains(a.name))
+ val dataRows =
+ relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
+
+ // Merges data values with partition values.
+ mergeWithPartitionValues(
+ relation.schema,
+ columns.map(_.name).toArray,
+ partitionColNames,
+ partitionValues,
+ toCatalystRDD(logicalRelation, needed, dataRows))
+ }
+
+ val unionedRows =
+ if (perPartitionRows.length == 0) {
+ relation.sqlContext.emptyResult
+ } else {
+ new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
+ }
- val unionedRows =
- if (perPartitionRows.length == 0) {
- relation.sqlContext.emptyResult
- } else {
- new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
+ unionedRows
}
+ }
+
+ // Create the scan operator. If needed, add Filter and/or Project on top of the scan.
+ // The added Filter/Project is on top of the unioned RDD. We do not want to create
+ // one Filter/Project for every partition.
+ val sparkPlan = pruneFilterProject(
+ logicalRelation,
+ projections,
+ filters,
+ scanBuilder)
- execution.PhysicalRDD.createFromDataSource(
- projections.map(_.toAttribute),
- unionedRows,
- logicalRelation.relation)
+ sparkPlan
}
// TODO: refactor this thing. It is very complicated because it does projection internally.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index cbf0704c4a..d99bfe4cd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -196,6 +196,13 @@ private[parquet] class CatalystRowConverter(
}
}
+ if (paddedParquetFields.length != catalystType.length) {
+ throw new UnsupportedOperationException(
+ "A Parquet file's schema has different number of fields with the table schema. " +
+ "Please enable schema merging by setting \"mergeSchema\" to true when load " +
+ "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
+ }
+
paddedParquetFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index cb4cedddbf..06dadbb5fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -136,4 +136,17 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
assert(fs.exists(commonSummaryPath))
}
}
+
+ test("SPARK-10334 Projections and filters should be kept in physical plan") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path)
+ val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
+ val physicalPlan = df.queryExecution.executedPlan
+
+ assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
+ assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
+ }
+ }
}