From 293ce85145d7a37f7cb329831cbf921be571c2f5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 20 Sep 2014 16:41:14 -0700 Subject: [SPARK-3414][SQL] Replace LowerCaseSchema with Resolver **This PR introduces a subtle change in semantics for HiveContext when using the results in Python or Scala. Specifically, while resolution remains case insensitive, it is now case preserving.** _This PR is a follow up to #2293 (and to a lesser extent #2262 #2334)._ In #2293 the catalog was changed to store analyzed logical plans instead of unresolved ones. While this change fixed the reported bug (which was caused by yet another instance of us forgetting to put in a `LowerCaseSchema` operator) it had the consequence of breaking assumptions made by `MultiInstanceRelation`. Specifically, we can't replace swap out leaf operators in a tree without rewriting changed expression ids (which happens when you self join the same RDD that has been registered as a temp table). In this PR, I instead remove the need to insert `LowerCaseSchema` operators at all, by moving the concern of matching up identifiers completely into analysis. Doing so allows the test cases from both #2293 and #2262 to pass at the same time (and likely fixes a slew of other "unknown unknown" bugs). While it is rolled back in this PR, storing the analyzed plan might actually be a good idea. For instance, it is kind of confusing if you register a temporary table, change the case sensitivity of resolution and now you can't query that table anymore. This can be addressed in a follow up PR. Follow-ups: - Configurable case sensitivity - Consider storing analyzed plans for temp tables Author: Michael Armbrust Closes #2382 from marmbrus/lowercase and squashes the following commits: c21171e [Michael Armbrust] Ensure the resolver is used for field lookups and ensure that case insensitive resolution is still case preserving. d4320f1 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into lowercase 2de881e [Michael Armbrust] Address comments. 219805a [Michael Armbrust] style 5b93711 [Michael Armbrust] Replace LowerCaseSchema with Resolver. --- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 10 +--------- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++---- .../org/apache/spark/sql/hive/HiveStrategies.scala | 4 ++-- .../sql/hive/execution/CreateTableAsSelect.scala | 4 +--- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 7 ++++--- ...ase insensitive-0-98b2e34c9134208e9fe7c62d33010005 | 1 + .../sql/hive/execution/HiveResolutionSuite.scala | 19 +++++++++++++------ 7 files changed, 24 insertions(+), 27 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/database.table table.attr case insensitive-0-98b2e34c9134208e9fe7c62d33010005 (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e0be09e679..3e1a7b7152 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -244,15 +244,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient - override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { - override def lookupRelation( - databaseName: Option[String], - tableName: String, - alias: Option[String] = None): LogicalPlan = { - - LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias)) - } - } + override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog // Note that HiveUDFs will be overridden by functions registered in this context. @transient diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2c0db9be57..6b4399e852 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -129,14 +129,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - case p @ InsertIntoTable( - LowerCaseSchema(table: MetastoreRelation), _, child, _) => + case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) => castChildOutput(p, table, child) case p @ logical.InsertIntoTable( - LowerCaseSchema( InMemoryRelation(_, _, _, - HiveTableScan(_, table, _))), _, child, _) => + HiveTableScan(_, table, _)), _, child, _) => castChildOutput(p, table, child) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 43dd3d234f..8ac17f3720 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.types.StringType import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} @@ -55,7 +55,7 @@ private[hive] trait HiveStrategies { object ParquetConversion extends Strategy { implicit class LogicalPlanHacks(s: SchemaRDD) { def lowerCase = - new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan)) + new SchemaRDD(s.sqlContext, s.logicalPlan) def addPartitioningAttributes(attrs: Seq[Attribute]) = new SchemaRDD( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 71ea774d77..1017fe6d53 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.MetastoreRelation @@ -52,8 +51,7 @@ case class CreateTableAsSelect( sc.catalog.createTable(database, tableName, query.output, false) // Get the Metastore Relation sc.catalog.lookupRelation(Some(database), tableName, None) match { - case LowerCaseSchema(r: MetastoreRelation) => r - case o: MetastoreRelation => o + case r: MetastoreRelation => r } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 5a0e6c5cc1..19ff3b66ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -44,10 +44,11 @@ private[hive] abstract class HiveFunctionRegistry def lookupFunction(name: String, children: Seq[Expression]): Expression = { // We only look it up to see if it exists, but do not include it in the HiveUDF since it is // not always serializable. - val functionInfo: FunctionInfo = Option(FunctionRegistry.getFunctionInfo(name)).getOrElse( - sys.error(s"Couldn't find function $name")) + val functionInfo: FunctionInfo = + Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse( + sys.error(s"Couldn't find function $name")) - val functionClassName = functionInfo.getFunctionClass.getName() + val functionClassName = functionInfo.getFunctionClass.getName if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) { val function = functionInfo.getFunctionClass.newInstance().asInstanceOf[UDF] diff --git a/sql/hive/src/test/resources/golden/database.table table.attr case insensitive-0-98b2e34c9134208e9fe7c62d33010005 b/sql/hive/src/test/resources/golden/database.table table.attr case insensitive-0-98b2e34c9134208e9fe7c62d33010005 new file mode 100644 index 0000000000..573541ac97 --- /dev/null +++ b/sql/hive/src/test/resources/golden/database.table table.attr case insensitive-0-98b2e34c9134208e9fe7c62d33010005 @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index b6be6bc1bf..ee9d08ff75 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -36,6 +36,9 @@ class HiveResolutionSuite extends HiveComparisonTest { createQueryTest("database.table table.attr", "SELECT src.key FROM default.src ORDER BY key LIMIT 1") + createQueryTest("database.table table.attr case insensitive", + "SELECT SRC.Key FROM Default.Src ORDER BY key LIMIT 1") + createQueryTest("alias.attr", "SELECT a.key FROM src a ORDER BY key LIMIT 1") @@ -56,14 +59,18 @@ class HiveResolutionSuite extends HiveComparisonTest { TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) .registerTempTable("caseSensitivityTest") - sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") - - println(sql("SELECT * FROM casesensitivitytest one JOIN casesensitivitytest two ON one.a = two.a").queryExecution) - - sql("SELECT * FROM casesensitivitytest one JOIN casesensitivitytest two ON one.a = two.a").collect() + val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") + assert(query.schema.fields.map(_.name) === Seq("a", "b", "A", "B", "a", "b", "A", "B"), + "The output schema did not preserve the case of the query.") + query.collect() + } - // TODO: sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a") + ignore("case insensitivity with scala reflection joins") { + // Test resolution with Scala Reflection + TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) + .registerTempTable("caseSensitivityTest") + sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect() } test("nested repeated resolution") { -- cgit v1.2.3