aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-26 16:29:14 -0700
committerReynold Xin <rxin@apache.org>2014-08-26 16:29:14 -0700
commitc4787a3690a9ed3b8b2c6c294fc4a6915436b6f7 (patch)
tree15b185728ed6e46fd93f795780a6266fc42ffd76 /sql/core/src/main
parent1208f72ac78960fe5060187761479b2a9a417c1b (diff)
downloadspark-c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7.tar.gz
spark-c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7.tar.bz2
spark-c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7.zip
[SPARK-3194][SQL] Add AttributeSet to fix bugs with invalid comparisons of AttributeReferences
It is common to want to describe sets of attributes that are in various parts of a query plan. However, the semantics of putting `AttributeReference` objects into a standard Scala `Set` result in subtle bugs when references differ cosmetically. For example, with case insensitive resolution it is possible to have two references to the same attribute whose names are not equal. In this PR I introduce a new abstraction, an `AttributeSet`, which performs all comparisons using the globally unique `ExpressionId` instead of case class equality. (There is already a related class, [`AttributeMap`](https://github.com/marmbrus/spark/blob/inMemStats/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala#L32)) This new type of set is used to fix a bug in the optimizer where needed attributes were getting projected away underneath join operators. I also took this opportunity to refactor the expression and query plan base classes. In all but one instance the logic for computing the `references` of an `Expression` were the same. Thus, I moved this logic into the base class. For query plans the semantics of the `references` method were ill defined (is it the references output? or is it those used by expression evaluation? or what?). As a result, this method wasn't really used very much. So, I removed it. TODO: - [x] Finish scala doc for `AttributeSet` - [x] Scan the code for other instances of `Set[Attribute]` and refactor them. - [x] Finish removing `references` from `QueryPlan` Author: Michael Armbrust <michael@databricks.com> Closes #2109 from marmbrus/attributeSets and squashes the following commits: 1c0dae5 [Michael Armbrust] work on serialization bug. 9ba868d [Michael Armbrust] Merge remote-tracking branch 'origin/master' into attributeSets 3ae5288 [Michael Armbrust] review comments 40ce7f6 [Michael Armbrust] style d577cc7 [Michael Armbrust] Scaladoc cae5d22 [Michael Armbrust] remove more references implementations d6e16be [Michael Armbrust] Remove more instances of "def references" and normal sets of attributes. fc26b49 [Michael Armbrust] Add AttributeSet class, remove references from Expression.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala2
5 files changed, 5 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 8a9f4deb6a..6f0eed3f63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -344,8 +344,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
- val projectSet = projectList.flatMap(_.references).toSet
- val filterSet = filterPredicates.flatMap(_.references).toSet
+ val projectSet = AttributeSet(projectList.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)
// Right now we still use a projection even if the only evaluation is applying an alias
@@ -354,7 +354,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
// TODO: Decouple final output schema from expression evaluation so this copy can be
// avoided safely.
- if (projectList.toSet == projectSet && filterSet.subsetOf(projectSet)) {
+ if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
+ filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index e63b490304..24e88eea31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -79,8 +79,6 @@ private[sql] case class InMemoryRelation(
override def children = Seq.empty
- override def references = Set.empty
-
override def newInstance() = {
new InMemoryRelation(
output.map(_.newInstance),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 21cbbc9772..7d33ea5b02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -141,10 +141,9 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
extends LogicalPlan with MultiInstanceRelation {
def output = alreadyPlanned.output
- override def references = Set.empty
override def children = Nil
- override final def newInstance: this.type = {
+ override final def newInstance(): this.type = {
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index f31df05182..5b896c55b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -58,8 +58,6 @@ package object debug {
}
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
- def references = Set.empty
-
def output = child.output
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index b92091b560..aef6ebf86b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -49,7 +49,6 @@ private[spark] case class PythonUDF(
override def toString = s"PythonUDF#$name(${children.mkString(",")})"
def nullable: Boolean = true
- def references: Set[Attribute] = children.flatMap(_.references).toSet
override def eval(input: Row) = sys.error("PythonUDFs can not be directly evaluated.")
}
@@ -113,7 +112,6 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
case class EvaluatePython(udf: PythonUDF, child: LogicalPlan) extends logical.UnaryNode {
val resultAttribute = AttributeReference("pythonUDF", udf.dataType, nullable=true)()
- def references = Set.empty
def output = child.output :+ resultAttribute
}