From f5d38c39255cc75325c6639561bfec1bc051f788 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 12 Jun 2016 16:52:15 -0700 Subject: Revert "[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter" This reverts commit 0ec279ffdf92853965e327a9f0f6956cacb7a23e. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 17 +++-------------- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 12 +++++++++++- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index be527005bc..a575561632 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,17 +452,6 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => - // A partitioned relation's schema can be different from the input logicalPlan, since - // partition columns are all moved after data columns. We Project to adjust the ordering. - val input = if (parts.nonEmpty) { - val (inputPartCols, inputDataCols) = child.output.partition { attr => - parts.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, child) - } else { - child - } - val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -478,8 +467,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Partition columns are already correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table), child = input) + // Assume partition columns are correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table)) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -497,7 +486,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => - i.copy(table = EliminateSubqueryAliases(table), child = input) + i.copy(table = EliminateSubqueryAliases(table)) } case u: UnresolvedRelation => val table = u.tableIdentifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8c05a7fce3..afae0786b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -506,11 +506,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) val overwrite = mode == SaveMode.Overwrite + // A partitioned relation's schema can be different from the input logicalPlan, since + // partition columns are all moved after data columns. We Project to adjust the ordering. + // TODO: this belongs to the analyzer. + val input = normalizedParCols.map { parCols => + val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => + parCols.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, df.logicalPlan) + }.getOrElse(df.logicalPlan) + df.sparkSession.sessionState.executePlan( InsertIntoTable( UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), - df.logicalPlan, + input, overwrite, ifNotExists = false)).toRdd } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 0a2bab4f5d..e0f6ccf04d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1042,7 +1042,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(2, "Duplicated project detected\n" + analyzedPlan) { + assertResult(1, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size @@ -1061,7 +1061,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(2, "Duplicated project detected\n" + analyzedPlan) { + assertResult(1, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size -- cgit v1.2.3