aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala30
2 files changed, 13 insertions, 27 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index d3cef6e0cb..9fd03ef8ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,15 +62,17 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
- AnalyzeCreateTable(sparkSession) ::
- PreprocessTableInsertion(conf) ::
- DataSourceAnalysis(conf) ::
new DetermineHiveSerde(conf) ::
- new HiveAnalysis(sparkSession) ::
new FindDataSourceTable(sparkSession) ::
new FindHiveSerdeTable(sparkSession) ::
new ResolveDataSource(sparkSession) :: Nil
+ override val postHocResolutionRules =
+ AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableInsertion(conf) ::
+ DataSourceAnalysis(conf) ::
+ new HiveAnalysis(sparkSession) :: Nil
+
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 838e6f4008..6cde783c5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -25,10 +25,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
-import org.apache.spark.sql.types.StructType
/**
@@ -78,10 +77,14 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
}
}
+/**
+ * Replaces generic operations with specific variants that are designed to work with Hive.
+ *
+ * Note that, this rule must be run after [[PreprocessTableInsertion]].
+ */
class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists)
- if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) =>
+ case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
@@ -98,25 +101,6 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
query,
mode == SaveMode.Ignore)
}
-
- /**
- * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
- * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to
- * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
- * fix the schema mismatch by adding Cast.
- */
- private def hasBeenPreprocessed(
- tableOutput: Seq[Attribute],
- partSchema: StructType,
- partSpec: Map[String, Option[String]],
- query: LogicalPlan): Boolean = {
- val partColNames = partSchema.map(_.name).toSet
- query.resolved && partSpec.keys.forall(partColNames.contains) && {
- val staticPartCols = partSpec.filter(_._2.isDefined).keySet
- val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
- expectedColumns.toStructType.sameType(query.schema)
- }
- }
}
/**