diff options
author | Wenchen Fan <wenchen@databricks.com> | 2017-01-23 20:01:10 -0800 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2017-01-23 20:01:10 -0800 |
commit | fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c (patch) | |
tree | 40243f6c3c616a6ad1cd89ff33b0f4a3e29f83f4 /sql/hive | |
parent | 49f5b0ae4c31e4b7369104a14e562e1546aa7736 (diff) | |
download | spark-fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c.tar.gz spark-fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c.tar.bz2 spark-fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c.zip |
[SPARK-19290][SQL] add a new extending interface in Analyzer for post-hoc resolution
## What changes were proposed in this pull request?
To implement DDL commands, we added several analyzer rules in sql/hive module to analyze DDL related plans. However, our `Analyzer` currently only have one extending interface: `extendedResolutionRules`, which defines extra rules that will be run together with other rules in the resolution batch, and doesn't fit DDL rules well, because:
1. DDL rules may do some checking and normalization, but we may do it many times as the resolution batch will run rules again and again, until fixed point, and it's hard to tell if a DDL rule has already done its checking and normalization. It's fine because DDL rules are idempotent, but it's bad for analysis performance
2. some DDL rules may depend on others, and it's pretty hard to write `if` conditions to guarantee the dependencies. It will be good if we have a batch which run rules in one pass, so that we can guarantee the dependencies by rules order.
This PR adds a new extending interface in `Analyzer`: `postHocResolutionRules`, which defines rules that will be run only once in a batch runs right after the resolution batch.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #16645 from cloud-fan/analyzer.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala | 10 | ||||
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 30 |
2 files changed, 13 insertions, 27 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d3cef6e0cb..9fd03ef8ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -62,15 +62,17 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = catalog.ParquetConversions :: catalog.OrcConversions :: - AnalyzeCreateTable(sparkSession) :: - PreprocessTableInsertion(conf) :: - DataSourceAnalysis(conf) :: new DetermineHiveSerde(conf) :: - new HiveAnalysis(sparkSession) :: new FindDataSourceTable(sparkSession) :: new FindHiveSerdeTable(sparkSession) :: new ResolveDataSource(sparkSession) :: Nil + override val postHocResolutionRules = + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: + new HiveAnalysis(sparkSession) :: Nil + override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 838e6f4008..6cde783c5a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -25,10 +25,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.types.StructType /** @@ -78,10 +77,14 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { } } +/** + * Replaces generic operations with specific variants that are designed to work with Hive. + * + * Note that, this rule must be run after [[PreprocessTableInsertion]]. + */ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) - if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) => + case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => @@ -98,25 +101,6 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { query, mode == SaveMode.Ignore) } - - /** - * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule - * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to - * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and - * fix the schema mismatch by adding Cast. - */ - private def hasBeenPreprocessed( - tableOutput: Seq[Attribute], - partSchema: StructType, - partSpec: Map[String, Option[String]], - query: LogicalPlan): Boolean = { - val partColNames = partSchema.map(_.name).toSet - query.resolved && partSpec.keys.forall(partColNames.contains) && { - val staticPartCols = partSpec.filter(_._2.isDefined).keySet - val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name)) - expectedColumns.toStructType.sameType(query.schema) - } - } } /** |