aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-03-24 19:24:22 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-24 19:24:22 -0700
commit8043b7bc74ff3640743ffc3f1be386dc42f3f44c (patch)
treee4afc1d8f77684ec04661ba6ffca0d6bd9b1058d /sql/hive
parent56db8a2f053625fa0c5ba1b0dc64de7d4fc80400 (diff)
downloadspark-8043b7bc74ff3640743ffc3f1be386dc42f3f44c.tar.gz
spark-8043b7bc74ff3640743ffc3f1be386dc42f3f44c.tar.bz2
spark-8043b7bc74ff3640743ffc3f1be386dc42f3f44c.zip
SPARK-1294 Fix resolution of uppercase field names using a HiveContext.
Fixing this bug required the following: - Creation of a new logical node that converts a schema to lowercase. - Generalization of the subquery eliding rule to also elide this new node - Fixing of several places where too tight assumptions were made on the types of `InsertIntoTable` children. - I also removed an API that was left in by accident that exposed catalyst data structures, and fix the logic that pushes down filters into hive tables scans to correctly compare attribute references. Author: Michael Armbrust <michael@databricks.com> Closes #202 from marmbrus/upperCaseFieldNames and squashes the following commits: 15e5265 [Michael Armbrust] Support for resolving mixed case fields from a reflected schema using HiveQL. 5aa5035 [Michael Armbrust] Remove API that exposes internal catalyst data structures. 9d99cb6 [Michael Armbrust] Attributes should be compared using exprId, not TreeNode.id.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala13
4 files changed, 29 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 491b3a6271..af35c919df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._
@@ -108,18 +108,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
- override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
+ override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
+ override def lookupRelation(
+ databaseName: Option[String],
+ tableName: String,
+ alias: Option[String] = None): LogicalPlan = {
+
+ LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
+ }
+ }
/* An analyzer that uses the Hive metastore. */
@transient
override lazy val analyzer = new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false)
- def tables: Seq[BaseRelation] = {
- // TODO: Move this functionallity to Catalog. Make client protected.
- val allTables = catalog.client.getAllTables("default")
- allTables.map(catalog.lookupRelation(None, _, None)).collect { case b: BaseRelation => b }
- }
-
/**
* Runs the specified SQL query using Hive.
*/
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index a5db283765..1667a21729 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.spark.sql.catalyst.analysis.Catalog
+
+import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
createTable(databaseName, tableName, child.output)
InsertIntoTable(
- lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation],
+ EliminateAnalysisOperators(
+ lookupRelation(Some(databaseName), tableName, None)),
Map.empty,
child,
overwrite = false)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c71141c419..3dd0530225 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -73,11 +73,11 @@ trait HiveStrategies {
case p @ FilteredOperation(predicates, relation: MetastoreRelation)
if relation.isPartitioned =>
- val partitionKeyIds = relation.partitionKeys.map(_.id).toSet
+ val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
// Filter out all predicates that only deal with partition keys
val (pruningPredicates, otherPredicates) = predicates.partition {
- _.references.map(_.id).subsetOf(partitionKeyIds)
+ _.references.map(_.exprId).subsetOf(partitionKeyIds)
}
val scan = HiveTableScan(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 996bd4efec..4bdea21467 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -19,6 +19,11 @@ package org.apache.spark.sql
package hive
package execution
+import TestHive._
+
+case class Data(a: Int, B: Int, n: Nested)
+case class Nested(a: Int, B: Int)
+
/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
*/
@@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
createQueryTest("alias.*",
"SELECT a.* FROM src a ORDER BY key LIMIT 1")
+ test("case insensitivity with scala reflection") {
+ // Test resolution with Scala Reflection
+ TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil)
+ .registerAsTable("caseSensitivityTest")
+
+ sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
+ }
+
/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.