aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-06-12 16:52:15 -0700
committerWenchen Fan <wenchen@databricks.com>2016-06-12 16:52:15 -0700
commitf5d38c39255cc75325c6639561bfec1bc051f788 (patch)
treed0410899e719a7c452f68b68c308ba5324f18b3c
parentcaebd7f2622340fc081bb9a2ea6a0b246f1e3a3f (diff)
downloadspark-f5d38c39255cc75325c6639561bfec1bc051f788.tar.gz
spark-f5d38c39255cc75325c6639561bfec1bc051f788.tar.bz2
spark-f5d38c39255cc75325c6639561bfec1bc051f788.zip
Revert "[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter"
This reverts commit 0ec279ffdf92853965e327a9f0f6956cacb7a23e.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
3 files changed, 16 insertions, 17 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index be527005bc..a575561632 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -452,17 +452,6 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
- // A partitioned relation's schema can be different from the input logicalPlan, since
- // partition columns are all moved after data columns. We Project to adjust the ordering.
- val input = if (parts.nonEmpty) {
- val (inputPartCols, inputDataCols) = child.output.partition { attr =>
- parts.contains(attr.name)
- }
- Project(inputDataCols ++ inputPartCols, child)
- } else {
- child
- }
-
val table = lookupTableFromCatalog(u)
// adding the table's partitions or validate the query's partition info
table match {
@@ -478,8 +467,8 @@ class Analyzer(
|Requested partitions: ${parts.keys.mkString(",")}
|Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin)
}
- // Partition columns are already correctly placed at the end of the child's output
- i.copy(table = EliminateSubqueryAliases(table), child = input)
+ // Assume partition columns are correctly placed at the end of the child's output
+ i.copy(table = EliminateSubqueryAliases(table))
} else {
// Set up the table's partition scheme with all dynamic partitions by moving partition
// columns to the end of the column list, in partition order.
@@ -497,7 +486,7 @@ class Analyzer(
child = Project(columns ++ partColumns, child))
}
case _ =>
- i.copy(table = EliminateSubqueryAliases(table), child = input)
+ i.copy(table = EliminateSubqueryAliases(table))
}
case u: UnresolvedRelation =>
val table = u.tableIdentifier
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8c05a7fce3..afae0786b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -506,11 +506,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
val overwrite = mode == SaveMode.Overwrite
+ // A partitioned relation's schema can be different from the input logicalPlan, since
+ // partition columns are all moved after data columns. We Project to adjust the ordering.
+ // TODO: this belongs to the analyzer.
+ val input = normalizedParCols.map { parCols =>
+ val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr =>
+ parCols.contains(attr.name)
+ }
+ Project(inputDataCols ++ inputPartCols, df.logicalPlan)
+ }.getOrElse(df.logicalPlan)
+
df.sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]),
- df.logicalPlan,
+ input,
overwrite,
ifNotExists = false)).toRdd
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 0a2bab4f5d..e0f6ccf04d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1042,7 +1042,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
.queryExecution.analyzed
}
- assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
+ assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
case _: Project => ()
}.size
@@ -1061,7 +1061,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
.queryExecution.analyzed
}
- assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
+ assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
case _: Project => ()
}.size