aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala8
4 files changed, 13 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index d2626440b9..b43b7ee71e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -44,15 +44,16 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
/**
+ * The set of all attributes that are produced by this node.
+ */
+ def producedAttributes: AttributeSet = AttributeSet.empty
+
+ /**
* Attributes that are referenced by expressions but not provided by this nodes children.
* Subclasses should override this method if they produce attributes internally as it is used by
* assertions designed to prevent the construction of invalid plans.
- *
- * Note that virtual columns should be excluded. Currently, we only support the grouping ID
- * virtual column.
*/
- def missingInput: AttributeSet =
- (references -- inputSet).filter(_.name != VirtualColumn.groupingIdName)
+ def missingInput: AttributeSet = references -- inputSet -- producedAttributes
/**
* Runs [[transform]] with `rule` on all expressions present in this query operator.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index e3e7a11dba..572d7d2f0b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.{analysis, CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.{StructField, StructType}
object LocalRelation {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 8f8747e105..6d859551f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -295,6 +295,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
abstract class LeafNode extends LogicalPlan {
override def children: Seq[LogicalPlan] = Nil
+ override def producedAttributes: AttributeSet = outputSet
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 64ef4d7996..5f34d4a4eb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -526,7 +526,7 @@ case class MapPartitions[T, U](
uEncoder: ExpressionEncoder[U],
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
- override def missingInput: AttributeSet = AttributeSet.empty
+ override def producedAttributes: AttributeSet = outputSet
}
/** Factory for constructing new `AppendColumn` nodes. */
@@ -552,7 +552,7 @@ case class AppendColumns[T, U](
newColumns: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output ++ newColumns
- override def missingInput: AttributeSet = super.missingInput -- newColumns
+ override def producedAttributes: AttributeSet = AttributeSet(newColumns)
}
/** Factory for constructing new `MapGroups` nodes. */
@@ -587,7 +587,7 @@ case class MapGroups[K, T, U](
groupingAttributes: Seq[Attribute],
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
- override def missingInput: AttributeSet = AttributeSet.empty
+ override def producedAttributes: AttributeSet = outputSet
}
/** Factory for constructing new `CoGroup` nodes. */
@@ -630,5 +630,5 @@ case class CoGroup[Key, Left, Right, Result](
rightGroup: Seq[Attribute],
left: LogicalPlan,
right: LogicalPlan) extends BinaryNode {
- override def missingInput: AttributeSet = AttributeSet.empty
+ override def producedAttributes: AttributeSet = outputSet
}