aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala19
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala30
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala7
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala13
9 files changed, 87 insertions, 37 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fc76e76617..161d28eba0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
- typeCoercionRules :_*)
+ typeCoercionRules :_*),
+ Batch("AnalysisOperators", fixedPoint,
+ EliminateAnalysisOperators)
)
/**
@@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
case s: Star => s.copy(table = s.table.map(_.toLowerCase))
case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
case Alias(c, name) => Alias(c, name.toLowerCase)()
+ case GetField(c, name) => GetField(c, name.toLowerCase)
}
}
}
@@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
exprs.collect { case _: Star => true }.nonEmpty
}
}
+
+/**
+ * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
+ * only required to provide scoping information for attributes and can be removed once analysis is
+ * complete. Similarly, this node also removes
+ * [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators.
+ */
+object EliminateAnalysisOperators extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Subquery(_, child) => child
+ case LowerCaseSchema(child) => child
+ }
+}
+
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c1201971d9..07ebbc90fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -27,31 +27,17 @@ import org.apache.spark.sql.catalyst.types._
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
- Batch("Subqueries", Once,
- EliminateSubqueries) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
- EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
}
/**
- * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
- * only required to provide scoping information for attributes and can be removed once analysis is
- * complete.
- */
-object EliminateSubqueries extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case Subquery(_, child) => child
- }
-}
-
-/**
* Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
* equivalent [[catalyst.expressions.Literal Literal]] values.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 6480cca300..61481de65e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -21,6 +21,7 @@ package plans
package logical
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
@@ -86,7 +87,7 @@ case class Join(
}
case class InsertIntoTable(
- table: BaseRelation,
+ table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean)
@@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
def references = Set.empty
}
+/**
+ * Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences
+ * this allows for optional case insensitive attribute resolution. This node can be elided after
+ * analysis.
+ */
+case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
+ protected def lowerCaseSchema(dataType: DataType): DataType = dataType match {
+ case StructType(fields) =>
+ StructType(fields.map(f =>
+ StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
+ case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
+ case otherType => otherType
+ }
+
+ val output = child.output.map {
+ case a: AttributeReference =>
+ AttributeReference(
+ a.name.toLowerCase,
+ lowerCaseSchema(a.dataType),
+ a.nullable)(
+ a.exprId,
+ a.qualifiers)
+ }
+
+ def references = Set.empty
+}
+
case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
extends UnaryNode {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 2c107b865a..53f760fb4c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -19,21 +19,22 @@ package org.apache.spark.sql
package catalyst
package optimizer
-import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types.IntegerType
// For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
class ConstantFoldingSuite extends OptimizerTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
- Batch("Subqueries", Once,
- EliminateSubqueries) ::
+ Batch("AnalysisNodes", Once,
+ EliminateAnalysisOperators) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification) :: Nil
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0aa7e4d04d..ae1b2b13dd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
package catalyst
package optimizer
+
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -31,9 +33,8 @@ class FilterPushdownSuite extends OptimizerTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
- EliminateSubqueries) ::
+ EliminateAnalysisOperators) ::
Batch("Filter Pushdown", Once,
- EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
@@ -172,7 +173,7 @@ class FilterPushdownSuite extends OptimizerTest {
}
val optimized = Optimize(originalQuery.analyze)
- comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized)
+ comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
}
test("joins: conjunctive predicates") {
@@ -191,7 +192,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze
- comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+ comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
test("joins: conjunctive predicates #2") {
@@ -210,7 +211,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze
- comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+ comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
test("joins: conjunctive predicates #3") {
@@ -233,6 +234,6 @@ class FilterPushdownSuite extends OptimizerTest {
condition = Some("z.a".attr === "x.b".attr))
.analyze
- comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+ comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 491b3a6271..af35c919df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._
@@ -108,18 +108,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
- override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
+ override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
+ override def lookupRelation(
+ databaseName: Option[String],
+ tableName: String,
+ alias: Option[String] = None): LogicalPlan = {
+
+ LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
+ }
+ }
/* An analyzer that uses the Hive metastore. */
@transient
override lazy val analyzer = new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false)
- def tables: Seq[BaseRelation] = {
- // TODO: Move this functionallity to Catalog. Make client protected.
- val allTables = catalog.client.getAllTables("default")
- allTables.map(catalog.lookupRelation(None, _, None)).collect { case b: BaseRelation => b }
- }
-
/**
* Runs the specified SQL query using Hive.
*/
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index a5db283765..1667a21729 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.spark.sql.catalyst.analysis.Catalog
+
+import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
createTable(databaseName, tableName, child.output)
InsertIntoTable(
- lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation],
+ EliminateAnalysisOperators(
+ lookupRelation(Some(databaseName), tableName, None)),
Map.empty,
child,
overwrite = false)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c71141c419..3dd0530225 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -73,11 +73,11 @@ trait HiveStrategies {
case p @ FilteredOperation(predicates, relation: MetastoreRelation)
if relation.isPartitioned =>
- val partitionKeyIds = relation.partitionKeys.map(_.id).toSet
+ val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
// Filter out all predicates that only deal with partition keys
val (pruningPredicates, otherPredicates) = predicates.partition {
- _.references.map(_.id).subsetOf(partitionKeyIds)
+ _.references.map(_.exprId).subsetOf(partitionKeyIds)
}
val scan = HiveTableScan(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 996bd4efec..4bdea21467 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -19,6 +19,11 @@ package org.apache.spark.sql
package hive
package execution
+import TestHive._
+
+case class Data(a: Int, B: Int, n: Nested)
+case class Nested(a: Int, B: Int)
+
/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
*/
@@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
createQueryTest("alias.*",
"SELECT a.* FROM src a ORDER BY key LIMIT 1")
+ test("case insensitivity with scala reflection") {
+ // Test resolution with Scala Reflection
+ TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil)
+ .registerAsTable("caseSensitivityTest")
+
+ sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
+ }
+
/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.