aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala9
3 files changed, 20 insertions, 4 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 04c48c3859..39d87a9d14 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -307,4 +307,10 @@ private[hive] case class MetastoreRelation
val attributes = hiveQlTable.getCols.map(_.toAttribute)
val output = attributes ++ partitionKeys
+
+ /** An attribute map that can be used to lookup original attributes based on expression id. */
+ val attributeMap = AttributeMap(output.map(o => (o,o)))
+
+ /** An attribute map for determining the ordinal for non-partition columns. */
+ val columnOrdinals = AttributeMap(attributes.zipWithIndex)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index b7f3ade4ea..d39413a44a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.hive._
*/
@DeveloperApi
case class HiveTableScan(
- attributes: Seq[Attribute],
+ requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Option[Expression])(
@transient val context: HiveContext)
@@ -53,6 +53,9 @@ case class HiveTableScan(
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
+ // Retrieve the original attributes based on expression ID so that capitalization matches.
+ val attributes = requestedAttributes.map(relation.attributeMap)
+
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
private[this] val boundPruningPred = partitionPruningPred.map { pred =>
@@ -81,9 +84,7 @@ case class HiveTableScan(
private def addColumnMetadataToConf(hiveConf: HiveConf) {
// Specifies needed column IDs for those non-partitioning columns.
- val neededColumnIDs =
- attributes.map(a =>
- relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0)
+ val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index c5736723b4..2f3db95882 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.hive.test.TestHive
+
class HiveTableScanSuite extends HiveComparisonTest {
createQueryTest("partition_based_table_scan_with_different_serde",
@@ -38,4 +40,11 @@ class HiveTableScanSuite extends HiveComparisonTest {
|
|SELECT * from part_scan_test;
""".stripMargin)
+
+ test("Spark-4041: lowercase issue") {
+ TestHive.sql("CREATE TABLE tb (KEY INT, VALUE STRING) STORED AS ORC")
+ TestHive.sql("insert into table tb select key, value from src")
+ TestHive.sql("select KEY from tb where VALUE='just_for_test' limit 5").collect()
+ TestHive.sql("drop table tb")
+ }
}