diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-06-10 11:05:04 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-06-10 11:05:04 -0700 |
commit | 0ec279ffdf92853965e327a9f0f6956cacb7a23e (patch) | |
tree | e377cbb45d54717cdbd2471b8868c8dfdd7cc0b9 | |
parent | abdb5d42c5802c8f60876aa1285c803d02881258 (diff) | |
download | spark-0ec279ffdf92853965e327a9f0f6956cacb7a23e.tar.gz spark-0ec279ffdf92853965e327a9f0f6956cacb7a23e.tar.bz2 spark-0ec279ffdf92853965e327a9f0f6956cacb7a23e.zip |
[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter
## What changes were proposed in this pull request?
This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #13496 from viirya/move-analyzer-stuff.
3 files changed, 17 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1ca99f79e..58f3904183 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,6 +452,17 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => + // A partitioned relation's schema can be different from the input logicalPlan, since + // partition columns are all moved after data columns. We Project to adjust the ordering. + val input = if (parts.nonEmpty) { + val (inputPartCols, inputDataCols) = child.output.partition { attr => + parts.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, child) + } else { + child + } + val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -467,8 +478,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Assume partition columns are correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table)) + // Partition columns are already correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table), child = input) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -486,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => - i.copy(table = EliminateSubqueryAliases(table)) + i.copy(table = EliminateSubqueryAliases(table), child = input) } case u: UnresolvedRelation => val table = u.tableIdentifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 32e2fdc3f9..6ce59e885a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite - // A partitioned relation's schema can be different from the input logicalPlan, since - // partition columns are all moved after data columns. We Project to adjust the ordering. - // TODO: this belongs to the analyzer. - val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => - parCols.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, df.logicalPlan) - }.getOrElse(df.logicalPlan) - df.sparkSession.sessionState.executePlan( InsertIntoTable( UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), - input, + df.logicalPlan, overwrite, ifNotExists = false)).toRdd } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e0f6ccf04d..0a2bab4f5d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1042,7 +1042,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { + assertResult(2, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size @@ -1061,7 +1061,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(1, "Duplicated project detected\n" + analyzedPlan) { + assertResult(2, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size |