From 01ba95d8bfc16a2542c67b066b0a1d1e465f91da Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 28 Dec 2015 12:48:30 -0800 Subject: [SPARK-12441][SQL] Fixing missingInput in Generate/MapPartitions/AppendColumns/MapGroups/CoGroup When explain any plan with Generate, we will see an exclamation mark in the plan. Normally, when we see this mark, it means the plan has an error. This PR is to correct the `missingInput` in `Generate`. For example, ```scala val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters") val df2 = df.explode('letters) { case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq } df2.explain(true) ``` Before the fix, the plan is like ``` == Parsed Logical Plan == 'Generate UserDefinedGenerator('letters), true, false, None +- Project [_1#0 AS number#2,_2#1 AS letters#3] +- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]] == Analyzed Logical Plan == number: int, letters: string, _1: string Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8] +- Project [_1#0 AS number#2,_2#1 AS letters#3] +- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]] == Optimized Logical Plan == Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8] +- LocalRelation [number#2,letters#3], [[1,a b c],[2,a b],[3,a]] == Physical Plan == !Generate UserDefinedGenerator(letters#3), true, false, [number#2,letters#3,_1#8] +- LocalTableScan [number#2,letters#3], [[1,a b c],[2,a b],[3,a]] ``` **Updates**: The same issues are also found in the other four Dataset operators: `MapPartitions`/`AppendColumns`/`MapGroups`/`CoGroup`. Fixed all these four. Author: gatorsmile Author: xiaoli Author: Xiao Li Closes #10393 from gatorsmile/generateExplain. --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 11 ++++++----- .../spark/sql/catalyst/plans/logical/LocalRelation.scala | 4 ++-- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 1 + .../spark/sql/catalyst/plans/logical/basicOperators.scala | 8 ++++---- 4 files changed, 13 insertions(+), 11 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index d2626440b9..b43b7ee71e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -43,16 +43,17 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy def inputSet: AttributeSet = AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output)) + /** + * The set of all attributes that are produced by this node. + */ + def producedAttributes: AttributeSet = AttributeSet.empty + /** * Attributes that are referenced by expressions but not provided by this nodes children. * Subclasses should override this method if they produce attributes internally as it is used by * assertions designed to prevent the construction of invalid plans. - * - * Note that virtual columns should be excluded. Currently, we only support the grouping ID - * virtual column. */ - def missingInput: AttributeSet = - (references -- inputSet).filter(_.name != VirtualColumn.groupingIdName) + def missingInput: AttributeSet = references -- inputSet -- producedAttributes /** * Runs [[transform]] with `rule` on all expressions present in this query operator. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index e3e7a11dba..572d7d2f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.{analysis, CatalystTypeConverters, InternalRow} import org.apache.spark.sql.types.{StructField, StructType} object LocalRelation { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8f8747e105..6d859551f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -295,6 +295,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { */ abstract class LeafNode extends LogicalPlan { override def children: Seq[LogicalPlan] = Nil + override def producedAttributes: AttributeSet = outputSet } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 64ef4d7996..5f34d4a4eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -526,7 +526,7 @@ case class MapPartitions[T, U]( uEncoder: ExpressionEncoder[U], output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet } /** Factory for constructing new `AppendColumn` nodes. */ @@ -552,7 +552,7 @@ case class AppendColumns[T, U]( newColumns: Seq[Attribute], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output ++ newColumns - override def missingInput: AttributeSet = super.missingInput -- newColumns + override def producedAttributes: AttributeSet = AttributeSet(newColumns) } /** Factory for constructing new `MapGroups` nodes. */ @@ -587,7 +587,7 @@ case class MapGroups[K, T, U]( groupingAttributes: Seq[Attribute], output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet } /** Factory for constructing new `CoGroup` nodes. */ @@ -630,5 +630,5 @@ case class CoGroup[Key, Left, Right, Result]( rightGroup: Seq[Attribute], left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet } -- cgit v1.2.3