From 652bbb1bf62722b08a062c7a2bf72019f85e179e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 9 May 2016 17:01:23 +0800 Subject: [SPARK-14459][SQL] Detect relation partitioning and adjust the logical plan ## What changes were proposed in this pull request? This detects a relation's partitioning and adds checks to the analyzer. If an InsertIntoTable node has no partitioning, it is replaced by the relation's partition scheme and input columns are correctly adjusted, placing the partition columns at the end in partition order. If an InsertIntoTable node has partitioning, it is checked against the table's reported partitions. These changes required adding a PartitionedRelation trait to the catalog interface because Hive's MetastoreRelation doesn't extend CatalogRelation. This commit also includes a fix to InsertIntoTable's resolved logic, which now detects that all expected columns are present, including dynamic partition columns. Previously, the number of expected columns was not checked and resolved was true if there were missing columns. ## How was this patch tested? This adds new tests to the InsertIntoTableSuite that are fixed by this PR. Author: Ryan Blue Closes #12239 from rdblue/SPARK-14459-detect-hive-partitioning. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 41 ++++++++++++++++++++-- .../plans/logical/basicLogicalOperators.scala | 19 ++++++++-- 2 files changed, 54 insertions(+), 6 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9e9a856286..b7884f9b60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -444,8 +444,43 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => - i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => + val table = lookupTableFromCatalog(u) + // adding the table's partitions or validate the query's partition info + table match { + case relation: CatalogRelation if relation.catalogTable.partitionColumns.nonEmpty => + val tablePartitionNames = relation.catalogTable.partitionColumns.map(_.name) + if (parts.keys.nonEmpty) { + // the query's partitioning must match the table's partitioning + // this is set for queries like: insert into ... partition (one = "a", two = ) + // TODO: add better checking to pre-inserts to avoid needing this here + if (tablePartitionNames.size != parts.keySet.size) { + throw new AnalysisException( + s"""Requested partitioning does not match the ${u.tableIdentifier} table: + |Requested partitions: ${parts.keys.mkString(",")} + |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) + } + // Assume partition columns are correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table)) + } else { + // Set up the table's partition scheme with all dynamic partitions by moving partition + // columns to the end of the column list, in partition order. + val (inputPartCols, columns) = child.output.partition { attr => + tablePartitionNames.contains(attr.name) + } + // All partition columns are dynamic because this InsertIntoTable had no partitioning + val partColumns = tablePartitionNames.map { name => + inputPartCols.find(_.name == name).getOrElse( + throw new AnalysisException(s"Cannot find partition column $name")) + } + i.copy( + table = EliminateSubqueryAliases(table), + partition = tablePartitionNames.map(_ -> None).toMap, + child = Project(columns ++ partColumns, child)) + } + case _ => + i.copy(table = EliminateSubqueryAliases(table)) + } case u: UnresolvedRelation => val table = u.tableIdentifier if (table.database.isDefined && conf.runSQLonFile && diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 8b438e40e6..732b0d7919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -354,10 +354,23 @@ case class InsertIntoTable( override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty + private[spark] lazy val expectedColumns = { + if (table.output.isEmpty) { + None + } else { + val numDynamicPartitions = partition.values.count(_.isEmpty) + val (partitionColumns, dataColumns) = table.output + .partition(a => partition.keySet.contains(a.name)) + Some(dataColumns ++ partitionColumns.takeRight(numDynamicPartitions)) + } + } + assert(overwrite || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { - case (childAttr, tableAttr) => - DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + override lazy val resolved: Boolean = childrenResolved && expectedColumns.forall { expected => + child.output.size == expected.size && child.output.zip(expected).forall { + case (childAttr, tableAttr) => + DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + } } } -- cgit v1.2.3