aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2015-12-28 12:48:30 -0800
committerMichael Armbrust <michael@databricks.com>2015-12-28 12:48:30 -0800
commit01ba95d8bfc16a2542c67b066b0a1d1e465f91da (patch)
tree416aa86355d55123ff54c42a8869f3f9bca410ea /sql/catalyst
parenta6a4812434c6f43cd4742437f957fecd86220255 (diff)
downloadspark-01ba95d8bfc16a2542c67b066b0a1d1e465f91da.tar.gz
spark-01ba95d8bfc16a2542c67b066b0a1d1e465f91da.tar.bz2
spark-01ba95d8bfc16a2542c67b066b0a1d1e465f91da.zip
[SPARK-12441][SQL] Fixing missingInput in Generate/MapPartitions/AppendColumns/MapGroups/CoGroup
When explain any plan with Generate, we will see an exclamation mark in the plan. Normally, when we see this mark, it means the plan has an error. This PR is to correct the `missingInput` in `Generate`. For example, ```scala val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters") val df2 = df.explode('letters) { case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq } df2.explain(true) ``` Before the fix, the plan is like ``` == Parsed Logical Plan == 'Generate UserDefinedGenerator('letters), true, false, None +- Project [_1#0 AS number#2,_2#1 AS letters#3] +- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]] == Analyzed Logical Plan == number: int, letters: string, _1: string Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8] +- Project [_1#0 AS number#2,_2#1 AS letters#3] +- LocalRelation [_1#0,_2#1], [[1,a b c],[2,a b],[3,a]] == Optimized Logical Plan == Generate UserDefinedGenerator(letters#3), true, false, None, [_1#8] +- LocalRelation [number#2,letters#3], [[1,a b c],[2,a b],[3,a]] == Physical Plan == !Generate UserDefinedGenerator(letters#3), true, false, [number#2,letters#3,_1#8] +- LocalTableScan [number#2,letters#3], [[1,a b c],[2,a b],[3,a]] ``` **Updates**: The same issues are also found in the other four Dataset operators: `MapPartitions`/`AppendColumns`/`MapGroups`/`CoGroup`. Fixed all these four. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #10393 from gatorsmile/generateExplain.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala8
4 files changed, 13 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index d2626440b9..b43b7ee71e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -44,15 +44,16 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
/**
+ * The set of all attributes that are produced by this node.
+ */
+ def producedAttributes: AttributeSet = AttributeSet.empty
+
+ /**
* Attributes that are referenced by expressions but not provided by this nodes children.
* Subclasses should override this method if they produce attributes internally as it is used by
* assertions designed to prevent the construction of invalid plans.
- *
- * Note that virtual columns should be excluded. Currently, we only support the grouping ID
- * virtual column.
*/
- def missingInput: AttributeSet =
- (references -- inputSet).filter(_.name != VirtualColumn.groupingIdName)
+ def missingInput: AttributeSet = references -- inputSet -- producedAttributes
/**
* Runs [[transform]] with `rule` on all expressions present in this query operator.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index e3e7a11dba..572d7d2f0b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.{analysis, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.{StructField, StructType}
object LocalRelation {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 8f8747e105..6d859551f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -295,6 +295,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
abstract class LeafNode extends LogicalPlan {
override def children: Seq[LogicalPlan] = Nil
+ override def producedAttributes: AttributeSet = outputSet
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 64ef4d7996..5f34d4a4eb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -526,7 +526,7 @@ case class MapPartitions[T, U](
uEncoder: ExpressionEncoder[U],
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
- override def missingInput: AttributeSet = AttributeSet.empty
+ override def producedAttributes: AttributeSet = outputSet
}
/** Factory for constructing new `AppendColumn` nodes. */
@@ -552,7 +552,7 @@ case class AppendColumns[T, U](
newColumns: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output ++ newColumns
- override def missingInput: AttributeSet = super.missingInput -- newColumns
+ override def producedAttributes: AttributeSet = AttributeSet(newColumns)
}
/** Factory for constructing new `MapGroups` nodes. */
@@ -587,7 +587,7 @@ case class MapGroups[K, T, U](
groupingAttributes: Seq[Attribute],
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
- override def missingInput: AttributeSet = AttributeSet.empty
+ override def producedAttributes: AttributeSet = outputSet
}
/** Factory for constructing new `CoGroup` nodes. */
@@ -630,5 +630,5 @@ case class CoGroup[Key, Left, Right, Result](
rightGroup: Seq[Attribute],
left: LogicalPlan,
right: LogicalPlan) extends BinaryNode {
- override def missingInput: AttributeSet = AttributeSet.empty
+ override def producedAttributes: AttributeSet = outputSet
}