diff options
author | Michael Armbrust <michael@databricks.com> | 2014-08-26 16:29:14 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-08-26 16:29:14 -0700 |
commit | c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7 (patch) | |
tree | 15b185728ed6e46fd93f795780a6266fc42ffd76 /sql/hive | |
parent | 1208f72ac78960fe5060187761479b2a9a417c1b (diff) | |
download | spark-c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7.tar.gz spark-c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7.tar.bz2 spark-c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7.zip |
[SPARK-3194][SQL] Add AttributeSet to fix bugs with invalid comparisons of AttributeReferences
It is common to want to describe sets of attributes that are in various parts of a query plan. However, the semantics of putting `AttributeReference` objects into a standard Scala `Set` result in subtle bugs when references differ cosmetically. For example, with case insensitive resolution it is possible to have two references to the same attribute whose names are not equal.
In this PR I introduce a new abstraction, an `AttributeSet`, which performs all comparisons using the globally unique `ExpressionId` instead of case class equality. (There is already a related class, [`AttributeMap`](https://github.com/marmbrus/spark/blob/inMemStats/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala#L32)) This new type of set is used to fix a bug in the optimizer where needed attributes were getting projected away underneath join operators.
I also took this opportunity to refactor the expression and query plan base classes. In all but one instance the logic for computing the `references` of an `Expression` were the same. Thus, I moved this logic into the base class.
For query plans the semantics of the `references` method were ill defined (is it the references output? or is it those used by expression evaluation? or what?). As a result, this method wasn't really used very much. So, I removed it.
TODO:
- [x] Finish scala doc for `AttributeSet`
- [x] Scan the code for other instances of `Set[Attribute]` and refactor them.
- [x] Finish removing `references` from `QueryPlan`
Author: Michael Armbrust <michael@databricks.com>
Closes #2109 from marmbrus/attributeSets and squashes the following commits:
1c0dae5 [Michael Armbrust] work on serialization bug.
9ba868d [Michael Armbrust] Merge remote-tracking branch 'origin/master' into attributeSets
3ae5288 [Michael Armbrust] review comments
40ce7f6 [Michael Armbrust] style
d577cc7 [Michael Armbrust] Scaladoc
cae5d22 [Michael Armbrust] remove more references implementations
d6e16be [Michael Armbrust] Remove more instances of "def references" and normal sets of attributes.
fc26b49 [Michael Armbrust] Add AttributeSet class, remove references from Expression.
Diffstat (limited to 'sql/hive')
3 files changed, 12 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 389ace726d..10fa8314c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -79,9 +79,9 @@ private[hive] trait HiveStrategies { hiveContext.convertMetastoreParquet => // Filter out all predicates that only deal with partition keys - val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet + val partitionsKeys = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { - _.references.map(_.exprId).subsetOf(partitionKeyIds) + _.references.subsetOf(partitionsKeys) } // We are going to throw the predicates and projection back at the whole optimization @@ -176,9 +176,9 @@ private[hive] trait HiveStrategies { case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. - val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet + val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { - _.references.map(_.exprId).subsetOf(partitionKeyIds) + _.references.subsetOf(partitionKeyIds) } pruneFilterProject( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index c6497a15ef..7d1ad53d8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -88,7 +88,6 @@ private[hive] abstract class HiveUdf extends Expression with Logging with HiveFu type EvaluatedType = Any def nullable = true - def references = children.flatMap(_.references).toSet lazy val function = createFunction[UDFType]() @@ -229,8 +228,6 @@ private[hive] case class HiveGenericUdaf( def nullable: Boolean = true - def references: Set[Attribute] = children.map(_.references).flatten.toSet - override def toString = s"$nodeName#$functionClassName(${children.mkString(",")})" def newInstance() = new HiveUdafFunction(functionClassName, children, this) @@ -253,8 +250,6 @@ private[hive] case class HiveGenericUdtf( children: Seq[Expression]) extends Generator with HiveInspectors with HiveFunctionFactory { - override def references = children.flatMap(_.references).toSet - @transient protected lazy val function: GenericUDTF = createFunction() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 6b3ffd1c0f..b6be6bc1bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested]) case class Nested(a: Int, B: Int) +case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested]) /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. @@ -57,6 +57,13 @@ class HiveResolutionSuite extends HiveComparisonTest { .registerTempTable("caseSensitivityTest") sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") + + println(sql("SELECT * FROM casesensitivitytest one JOIN casesensitivitytest two ON one.a = two.a").queryExecution) + + sql("SELECT * FROM casesensitivitytest one JOIN casesensitivitytest two ON one.a = two.a").collect() + + // TODO: sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a") + } test("nested repeated resolution") { |