aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-05-09 17:01:23 +0800
committerCheng Lian <lian@databricks.com>2016-05-09 17:01:23 +0800
commit652bbb1bf62722b08a062c7a2bf72019f85e179e (patch)
tree5e4d588f2c9070a0c645c176b7bd42dd34fe04b5 /sql/catalyst
parent16a503cf0af3e7c703d56a1a730e4f3a534f6b3c (diff)
downloadspark-652bbb1bf62722b08a062c7a2bf72019f85e179e.tar.gz
spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.tar.bz2
spark-652bbb1bf62722b08a062c7a2bf72019f85e179e.zip
[SPARK-14459][SQL] Detect relation partitioning and adjust the logical plan
## What changes were proposed in this pull request? This detects a relation's partitioning and adds checks to the analyzer. If an InsertIntoTable node has no partitioning, it is replaced by the relation's partition scheme and input columns are correctly adjusted, placing the partition columns at the end in partition order. If an InsertIntoTable node has partitioning, it is checked against the table's reported partitions. These changes required adding a PartitionedRelation trait to the catalog interface because Hive's MetastoreRelation doesn't extend CatalogRelation. This commit also includes a fix to InsertIntoTable's resolved logic, which now detects that all expected columns are present, including dynamic partition columns. Previously, the number of expected columns was not checked and resolved was true if there were missing columns. ## How was this patch tested? This adds new tests to the InsertIntoTableSuite that are fixed by this PR. Author: Ryan Blue <blue@apache.org> Closes #12239 from rdblue/SPARK-14459-detect-hive-partitioning.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala41
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala19
2 files changed, 54 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9e9a856286..b7884f9b60 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -444,8 +444,43 @@ class Analyzer(
}
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
- i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+ case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
+ val table = lookupTableFromCatalog(u)
+ // adding the table's partitions or validate the query's partition info
+ table match {
+ case relation: CatalogRelation if relation.catalogTable.partitionColumns.nonEmpty =>
+ val tablePartitionNames = relation.catalogTable.partitionColumns.map(_.name)
+ if (parts.keys.nonEmpty) {
+ // the query's partitioning must match the table's partitioning
+ // this is set for queries like: insert into ... partition (one = "a", two = <expr>)
+ // TODO: add better checking to pre-inserts to avoid needing this here
+ if (tablePartitionNames.size != parts.keySet.size) {
+ throw new AnalysisException(
+ s"""Requested partitioning does not match the ${u.tableIdentifier} table:
+ |Requested partitions: ${parts.keys.mkString(",")}
+ |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin)
+ }
+ // Assume partition columns are correctly placed at the end of the child's output
+ i.copy(table = EliminateSubqueryAliases(table))
+ } else {
+ // Set up the table's partition scheme with all dynamic partitions by moving partition
+ // columns to the end of the column list, in partition order.
+ val (inputPartCols, columns) = child.output.partition { attr =>
+ tablePartitionNames.contains(attr.name)
+ }
+ // All partition columns are dynamic because this InsertIntoTable had no partitioning
+ val partColumns = tablePartitionNames.map { name =>
+ inputPartCols.find(_.name == name).getOrElse(
+ throw new AnalysisException(s"Cannot find partition column $name"))
+ }
+ i.copy(
+ table = EliminateSubqueryAliases(table),
+ partition = tablePartitionNames.map(_ -> None).toMap,
+ child = Project(columns ++ partColumns, child))
+ }
+ case _ =>
+ i.copy(table = EliminateSubqueryAliases(table))
+ }
case u: UnresolvedRelation =>
val table = u.tableIdentifier
if (table.database.isDefined && conf.runSQLonFile &&
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 8b438e40e6..732b0d7919 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -354,10 +354,23 @@ case class InsertIntoTable(
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
+ private[spark] lazy val expectedColumns = {
+ if (table.output.isEmpty) {
+ None
+ } else {
+ val numDynamicPartitions = partition.values.count(_.isEmpty)
+ val (partitionColumns, dataColumns) = table.output
+ .partition(a => partition.keySet.contains(a.name))
+ Some(dataColumns ++ partitionColumns.takeRight(numDynamicPartitions))
+ }
+ }
+
assert(overwrite || !ifNotExists)
- override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
- case (childAttr, tableAttr) =>
- DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
+ override lazy val resolved: Boolean = childrenResolved && expectedColumns.forall { expected =>
+ child.output.size == expected.size && child.output.zip(expected).forall {
+ case (childAttr, tableAttr) =>
+ DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
+ }
}
}