aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-07-31 11:15:25 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-31 11:15:25 -0700
commit3072b96026fa3e63e8eef780f2b04dd81f11ea27 (patch)
treec768386f792860948ad456b5f3967ef9d14ad727 /sql
parent92ca910eb866701e01b987a4f5003564b4785959 (diff)
downloadspark-3072b96026fa3e63e8eef780f2b04dd81f11ea27.tar.gz
spark-3072b96026fa3e63e8eef780f2b04dd81f11ea27.tar.bz2
spark-3072b96026fa3e63e8eef780f2b04dd81f11ea27.zip
[SPARK-2743][SQL] Resolve original attributes in ParquetTableScan
Author: Michael Armbrust <michael@databricks.com> Closes #1647 from marmbrus/parquetCase and squashes the following commits: a1799b7 [Michael Armbrust] move comment 2a2a68b [Michael Armbrust] Merge remote-tracking branch 'apache/master' into parquetCase bb35d5b [Michael Armbrust] Fix test case that produced an invalid plan. e6870bf [Michael Armbrust] Better error message. 539a2e1 [Michael Armbrust] Resolve original attributes in ParquetTableScan
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala17
3 files changed, 28 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 912a9f002b..759a2a586b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -51,13 +51,20 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext}
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
case class ParquetTableScan(
- // note: output cannot be transient, see
- // https://issues.apache.org/jira/browse/SPARK-1367
- output: Seq[Attribute],
+ attributes: Seq[Attribute],
relation: ParquetRelation,
columnPruningPred: Seq[Expression])
extends LeafNode {
+ // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
+ // by exprId. note: output cannot be transient, see
+ // https://issues.apache.org/jira/browse/SPARK-1367
+ val output = attributes.map { a =>
+ relation.output
+ .find(o => o.exprId == a.exprId)
+ .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
+ }
+
override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
@@ -110,7 +117,6 @@ case class ParquetTableScan(
ParquetTableScan(prunedAttributes, relation, columnPruningPred)
} else {
sys.error("Warning: Could not validate Parquet schema projection in pruneColumns")
- this
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 561f5b4a49..8955455ec9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -209,19 +209,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
test("Projection of simple Parquet file") {
- SparkPlan.currentContext.set(TestSQLContext)
- val scanner = new ParquetTableScan(
- ParquetTestData.testData.output,
- ParquetTestData.testData,
- Seq())
- val projected = scanner.pruneColumns(ParquetTypesConverter
- .convertToAttributes(MessageTypeParser
- .parseMessageType(ParquetTestData.subTestSchema)))
- assert(projected.output.size === 2)
- val result = projected
- .execute()
- .map(_.copy())
- .collect()
+ val result = ParquetTestData.testData.select('myboolean, 'mylong).collect()
result.zipWithIndex.foreach {
case (row, index) => {
if (index % 3 == 0)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 3bfe49a760..47526e3596 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet
+import java.io.File
+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
@@ -27,6 +29,8 @@ import org.apache.spark.util.Utils
// Implicits
import org.apache.spark.sql.hive.test.TestHive._
+case class Cases(lower: String, UPPER: String)
+
class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val dirname = Utils.createTempDir()
@@ -55,6 +59,19 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
Utils.deleteRecursively(dirname)
}
+ test("Case insensitive attribute names") {
+ val tempFile = File.createTempFile("parquet", "")
+ tempFile.delete()
+ sparkContext.parallelize(1 to 10)
+ .map(_.toString)
+ .map(i => Cases(i, i))
+ .saveAsParquetFile(tempFile.getCanonicalPath)
+
+ parquetFile(tempFile.getCanonicalPath).registerAsTable("cases")
+ hql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+ hql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+ }
+
test("SELECT on Parquet table") {
val rdd = hql("SELECT * FROM testsource").collect()
assert(rdd != null)