diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala | 10 | ||||
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 30 |
2 files changed, 13 insertions, 27 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d3cef6e0cb..9fd03ef8ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -62,15 +62,17 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = catalog.ParquetConversions :: catalog.OrcConversions :: - AnalyzeCreateTable(sparkSession) :: - PreprocessTableInsertion(conf) :: - DataSourceAnalysis(conf) :: new DetermineHiveSerde(conf) :: - new HiveAnalysis(sparkSession) :: new FindDataSourceTable(sparkSession) :: new FindHiveSerdeTable(sparkSession) :: new ResolveDataSource(sparkSession) :: Nil + override val postHocResolutionRules = + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: + new HiveAnalysis(sparkSession) :: Nil + override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 838e6f4008..6cde783c5a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -25,10 +25,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.types.StructType /** @@ -78,10 +77,14 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { } } +/** + * Replaces generic operations with specific variants that are designed to work with Hive. + * + * Note that, this rule must be run after [[PreprocessTableInsertion]]. + */ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) - if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) => + case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => @@ -98,25 +101,6 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { query, mode == SaveMode.Ignore) } - - /** - * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule - * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to - * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and - * fix the schema mismatch by adding Cast. - */ - private def hasBeenPreprocessed( - tableOutput: Seq[Attribute], - partSchema: StructType, - partSpec: Map[String, Option[String]], - query: LogicalPlan): Boolean = { - val partColNames = partSchema.map(_.name).toSet - query.resolved && partSpec.keys.forall(partColNames.contains) && { - val staticPartCols = partSpec.filter(_._2.isDefined).keySet - val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name)) - expectedColumns.toStructType.sameType(query.schema) - } - } } /** |