From 89af6dfc3afb2b8fc60fa74afb52541dbf3c4e8f Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 27 Oct 2014 20:46:20 -0700 Subject: [SPARK-4041][SQL] Attributes names in table scan should converted to lowercase when compare with relation attributes In ```MetastoreRelation``` the attributes name is lowercase because of hive using lowercase for fields name, so we should convert attributes name in table scan lowercase in ```indexWhere(_.name == a.name)```. ```neededColumnIDs``` may be not correct if not convert to lowercase. Author: wangfei Author: scwf Closes #2884 from scwf/fixColumnIds and squashes the following commits: 6174046 [scwf] use AttributeMap for this issue dc74a24 [wangfei] use lowerName and add a test case for this issue 3ff3a80 [wangfei] more safer change 294fcb7 [scwf] attributes names in table scan should convert lowercase in neededColumnsIDs --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++++++ .../org/apache/spark/sql/hive/execution/HiveTableScan.scala | 9 +++++---- .../org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala | 9 +++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 04c48c3859..39d87a9d14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -307,4 +307,10 @@ private[hive] case class MetastoreRelation val attributes = hiveQlTable.getCols.map(_.toAttribute) val output = attributes ++ partitionKeys + + /** An attribute map that can be used to lookup original attributes based on expression id. */ + val attributeMap = AttributeMap(output.map(o => (o,o))) + + /** An attribute map for determining the ordinal for non-partition columns. */ + val columnOrdinals = AttributeMap(attributes.zipWithIndex) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index b7f3ade4ea..d39413a44a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.hive._ */ @DeveloperApi case class HiveTableScan( - attributes: Seq[Attribute], + requestedAttributes: Seq[Attribute], relation: MetastoreRelation, partitionPruningPred: Option[Expression])( @transient val context: HiveContext) @@ -53,6 +53,9 @@ case class HiveTableScan( require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") + // Retrieve the original attributes based on expression ID so that capitalization matches. + val attributes = requestedAttributes.map(relation.attributeMap) + // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. private[this] val boundPruningPred = partitionPruningPred.map { pred => @@ -81,9 +84,7 @@ case class HiveTableScan( private def addColumnMetadataToConf(hiveConf: HiveConf) { // Specifies needed column IDs for those non-partitioning columns. - val neededColumnIDs = - attributes.map(a => - relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0) + val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index c5736723b4..2f3db95882 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.hive.test.TestHive + class HiveTableScanSuite extends HiveComparisonTest { createQueryTest("partition_based_table_scan_with_different_serde", @@ -38,4 +40,11 @@ class HiveTableScanSuite extends HiveComparisonTest { | |SELECT * from part_scan_test; """.stripMargin) + + test("Spark-4041: lowercase issue") { + TestHive.sql("CREATE TABLE tb (KEY INT, VALUE STRING) STORED AS ORC") + TestHive.sql("insert into table tb select key, value from src") + TestHive.sql("select KEY from tb where VALUE='just_for_test' limit 5").collect() + TestHive.sql("drop table tb") + } } -- cgit v1.2.3