diff options
author | Ryan Blue <blue@apache.org> | 2016-05-09 17:01:23 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-05-09 17:01:23 +0800 |
commit | 652bbb1bf62722b08a062c7a2bf72019f85e179e (patch) | |
tree | 5e4d588f2c9070a0c645c176b7bd42dd34fe04b5 /sql/catalyst | |
parent | 16a503cf0af3e7c703d56a1a730e4f3a534f6b3c (diff) | |
download | spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.tar.gz spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.tar.bz2 spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.zip |
[SPARK-14459][SQL] Detect relation partitioning and adjust the logical plan
## What changes were proposed in this pull request?
This detects a relation's partitioning and adds checks to the analyzer.
If an InsertIntoTable node has no partitioning, it is replaced by the
relation's partition scheme and input columns are correctly adjusted,
placing the partition columns at the end in partition order. If an
InsertIntoTable node has partitioning, it is checked against the table's
reported partitions.
These changes required adding a PartitionedRelation trait to the catalog
interface because Hive's MetastoreRelation doesn't extend
CatalogRelation.
This commit also includes a fix to InsertIntoTable's resolved logic,
which now detects that all expected columns are present, including
dynamic partition columns. Previously, the number of expected columns
was not checked and resolved was true if there were missing columns.
## How was this patch tested?
This adds new tests to the InsertIntoTableSuite that are fixed by this PR.
Author: Ryan Blue <blue@apache.org>
Closes #12239 from rdblue/SPARK-14459-detect-hive-partitioning.
Diffstat (limited to 'sql/catalyst')
2 files changed, 54 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9e9a856286..b7884f9b60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -444,8 +444,43 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => - i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => + val table = lookupTableFromCatalog(u) + // adding the table's partitions or validate the query's partition info + table match { + case relation: CatalogRelation if relation.catalogTable.partitionColumns.nonEmpty => + val tablePartitionNames = relation.catalogTable.partitionColumns.map(_.name) + if (parts.keys.nonEmpty) { + // the query's partitioning must match the table's partitioning + // this is set for queries like: insert into ... partition (one = "a", two = <expr>) + // TODO: add better checking to pre-inserts to avoid needing this here + if (tablePartitionNames.size != parts.keySet.size) { + throw new AnalysisException( + s"""Requested partitioning does not match the ${u.tableIdentifier} table: + |Requested partitions: ${parts.keys.mkString(",")} + |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) + } + // Assume partition columns are correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table)) + } else { + // Set up the table's partition scheme with all dynamic partitions by moving partition + // columns to the end of the column list, in partition order. + val (inputPartCols, columns) = child.output.partition { attr => + tablePartitionNames.contains(attr.name) + } + // All partition columns are dynamic because this InsertIntoTable had no partitioning + val partColumns = tablePartitionNames.map { name => + inputPartCols.find(_.name == name).getOrElse( + throw new AnalysisException(s"Cannot find partition column $name")) + } + i.copy( + table = EliminateSubqueryAliases(table), + partition = tablePartitionNames.map(_ -> None).toMap, + child = Project(columns ++ partColumns, child)) + } + case _ => + i.copy(table = EliminateSubqueryAliases(table)) + } case u: UnresolvedRelation => val table = u.tableIdentifier if (table.database.isDefined && conf.runSQLonFile && diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 8b438e40e6..732b0d7919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -354,10 +354,23 @@ case class InsertIntoTable( override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty + private[spark] lazy val expectedColumns = { + if (table.output.isEmpty) { + None + } else { + val numDynamicPartitions = partition.values.count(_.isEmpty) + val (partitionColumns, dataColumns) = table.output + .partition(a => partition.keySet.contains(a.name)) + Some(dataColumns ++ partitionColumns.takeRight(numDynamicPartitions)) + } + } + assert(overwrite || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { - case (childAttr, tableAttr) => - DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + override lazy val resolved: Boolean = childrenResolved && expectedColumns.forall { expected => + child.output.size == expected.size && child.output.zip(expected).forall { + case (childAttr, tableAttr) => + DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + } } } |