aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-03-24 19:24:22 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-24 19:24:22 -0700
commit8043b7bc74ff3640743ffc3f1be386dc42f3f44c (patch)
treee4afc1d8f77684ec04661ba6ffca0d6bd9b1058d /sql/catalyst
parent56db8a2f053625fa0c5ba1b0dc64de7d4fc80400 (diff)
downloadspark-8043b7bc74ff3640743ffc3f1be386dc42f3f44c.tar.gz
spark-8043b7bc74ff3640743ffc3f1be386dc42f3f44c.tar.bz2
spark-8043b7bc74ff3640743ffc3f1be386dc42f3f44c.zip
SPARK-1294 Fix resolution of uppercase field names using a HiveContext.
Fixing this bug required the following: - Creation of a new logical node that converts a schema to lowercase. - Generalization of the subquery eliding rule to also elide this new node - Fixing of several places where too tight assumptions were made on the types of `InsertIntoTable` children. - I also removed an API that was left in by accident that exposed catalyst data structures, and fix the logic that pushes down filters into hive tables scans to correctly compare attribute references. Author: Michael Armbrust <michael@databricks.com> Closes #202 from marmbrus/upperCaseFieldNames and squashes the following commits: 15e5265 [Michael Armbrust] Support for resolving mixed case fields from a reflected schema using HiveQL. 5aa5035 [Michael Armbrust] Remove API that exposes internal catalyst data structures. 9d99cb6 [Michael Armbrust] Attributes should be compared using exprId, not TreeNode.id.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala19
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala30
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala7
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala13
5 files changed, 58 insertions, 25 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fc76e76617..161d28eba0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
- typeCoercionRules :_*)
+ typeCoercionRules :_*),
+ Batch("AnalysisOperators", fixedPoint,
+ EliminateAnalysisOperators)
)
/**
@@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
case s: Star => s.copy(table = s.table.map(_.toLowerCase))
case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
case Alias(c, name) => Alias(c, name.toLowerCase)()
+ case GetField(c, name) => GetField(c, name.toLowerCase)
}
}
}
@@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
exprs.collect { case _: Star => true }.nonEmpty
}
}
+
+/**
+ * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
+ * only required to provide scoping information for attributes and can be removed once analysis is
+ * complete. Similarly, this node also removes
+ * [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators.
+ */
+object EliminateAnalysisOperators extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Subquery(_, child) => child
+ case LowerCaseSchema(child) => child
+ }
+}
+
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c1201971d9..07ebbc90fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -27,31 +27,17 @@ import org.apache.spark.sql.catalyst.types._
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
- Batch("Subqueries", Once,
- EliminateSubqueries) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
- EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
}
/**
- * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
- * only required to provide scoping information for attributes and can be removed once analysis is
- * complete.
- */
-object EliminateSubqueries extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case Subquery(_, child) => child
- }
-}
-
-/**
* Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
* equivalent [[catalyst.expressions.Literal Literal]] values.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 6480cca300..61481de65e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -21,6 +21,7 @@ package plans
package logical
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
@@ -86,7 +87,7 @@ case class Join(
}
case class InsertIntoTable(
- table: BaseRelation,
+ table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean)
@@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
def references = Set.empty
}
+/**
+ * Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences
+ * this allows for optional case insensitive attribute resolution. This node can be elided after
+ * analysis.
+ */
+case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
+ protected def lowerCaseSchema(dataType: DataType): DataType = dataType match {
+ case StructType(fields) =>
+ StructType(fields.map(f =>
+ StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
+ case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
+ case otherType => otherType
+ }
+
+ val output = child.output.map {
+ case a: AttributeReference =>
+ AttributeReference(
+ a.name.toLowerCase,
+ lowerCaseSchema(a.dataType),
+ a.nullable)(
+ a.exprId,
+ a.qualifiers)
+ }
+
+ def references = Set.empty
+}
+
case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
extends UnaryNode {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 2c107b865a..53f760fb4c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -19,21 +19,22 @@ package org.apache.spark.sql
package catalyst
package optimizer
-import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types.IntegerType
// For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
class ConstantFoldingSuite extends OptimizerTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
- Batch("Subqueries", Once,
- EliminateSubqueries) ::
+ Batch("AnalysisNodes", Once,
+ EliminateAnalysisOperators) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification) :: Nil
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0aa7e4d04d..ae1b2b13dd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
package catalyst
package optimizer
+
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -31,9 +33,8 @@ class FilterPushdownSuite extends OptimizerTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
- EliminateSubqueries) ::
+ EliminateAnalysisOperators) ::
Batch("Filter Pushdown", Once,
- EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
@@ -172,7 +173,7 @@ class FilterPushdownSuite extends OptimizerTest {
}
val optimized = Optimize(originalQuery.analyze)
- comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized)
+ comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
}
test("joins: conjunctive predicates") {
@@ -191,7 +192,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze
- comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+ comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
test("joins: conjunctive predicates #2") {
@@ -210,7 +211,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze
- comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+ comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
test("joins: conjunctive predicates #3") {
@@ -233,6 +234,6 @@ class FilterPushdownSuite extends OptimizerTest {
condition = Some("z.a".attr === "x.b".attr))
.analyze
- comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+ comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
}