aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-23 20:01:10 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-23 20:01:10 -0800
commitfcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c (patch)
tree40243f6c3c616a6ad1cd89ff33b0f4a3e29f83f4 /sql/hive
parent49f5b0ae4c31e4b7369104a14e562e1546aa7736 (diff)
downloadspark-fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c.tar.gz
spark-fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c.tar.bz2
spark-fcfd5d0bbaf2fb2437d0eb12e3eba1b52153997c.zip
[SPARK-19290][SQL] add a new extending interface in Analyzer for post-hoc resolution
## What changes were proposed in this pull request? To implement DDL commands, we added several analyzer rules in sql/hive module to analyze DDL related plans. However, our `Analyzer` currently only have one extending interface: `extendedResolutionRules`, which defines extra rules that will be run together with other rules in the resolution batch, and doesn't fit DDL rules well, because: 1. DDL rules may do some checking and normalization, but we may do it many times as the resolution batch will run rules again and again, until fixed point, and it's hard to tell if a DDL rule has already done its checking and normalization. It's fine because DDL rules are idempotent, but it's bad for analysis performance 2. some DDL rules may depend on others, and it's pretty hard to write `if` conditions to guarantee the dependencies. It will be good if we have a batch which run rules in one pass, so that we can guarantee the dependencies by rules order. This PR adds a new extending interface in `Analyzer`: `postHocResolutionRules`, which defines rules that will be run only once in a batch runs right after the resolution batch. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16645 from cloud-fan/analyzer.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala30
2 files changed, 13 insertions, 27 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index d3cef6e0cb..9fd03ef8ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,15 +62,17 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
- AnalyzeCreateTable(sparkSession) ::
- PreprocessTableInsertion(conf) ::
- DataSourceAnalysis(conf) ::
new DetermineHiveSerde(conf) ::
- new HiveAnalysis(sparkSession) ::
new FindDataSourceTable(sparkSession) ::
new FindHiveSerdeTable(sparkSession) ::
new ResolveDataSource(sparkSession) :: Nil
+ override val postHocResolutionRules =
+ AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableInsertion(conf) ::
+ DataSourceAnalysis(conf) ::
+ new HiveAnalysis(sparkSession) :: Nil
+
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 838e6f4008..6cde783c5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -25,10 +25,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
-import org.apache.spark.sql.types.StructType
/**
@@ -78,10 +77,14 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
}
}
+/**
+ * Replaces generic operations with specific variants that are designed to work with Hive.
+ *
+ * Note that, this rule must be run after [[PreprocessTableInsertion]].
+ */
class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists)
- if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) =>
+ case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
@@ -98,25 +101,6 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
query,
mode == SaveMode.Ignore)
}
-
- /**
- * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
- * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to
- * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
- * fix the schema mismatch by adding Cast.
- */
- private def hasBeenPreprocessed(
- tableOutput: Seq[Attribute],
- partSchema: StructType,
- partSpec: Map[String, Option[String]],
- query: LogicalPlan): Boolean = {
- val partColNames = partSchema.map(_.name).toSet
- query.resolved && partSpec.keys.forall(partColNames.contains) && {
- val staticPartCols = partSpec.filter(_._2.isDefined).keySet
- val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
- expectedColumns.toStructType.sameType(query.schema)
- }
- }
}
/**