aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala81
2 files changed, 97 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index e49a235643..b4808fdbed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.hive
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
@@ -230,10 +234,21 @@ class HadoopTableReader(
// Fill all partition keys to the given MutableRow object
fillPartitionKeys(partValues, mutableRow)
+ val tableProperties = relation.tableDesc.getProperties
+
createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
- deserializer.initialize(hconf, partProps)
+ // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
+ // information) may be defined in table properties. Here we should merge table properties
+ // and partition properties before initializing the deserializer. Note that partition
+ // properties take a higher priority here. For example, a partition may have a different
+ // SerDe as the one defined in table properties.
+ val props = new Properties(tableProperties)
+ partProps.asScala.foreach {
+ case (key, value) => props.setProperty(key, value)
+ }
+ deserializer.initialize(hconf, props)
// get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index f7650e001a..feeaade561 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -17,7 +17,10 @@
package org.apache.spark.sql.hive
+import java.io.File
+
import com.google.common.io.Files
+import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -65,4 +68,82 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
sql("DROP TABLE IF EXISTS createAndInsertTest")
}
}
+
+ test("SPARK-13709: reading partitioned Avro table with nested schema") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val tableName = "spark_13709"
+ val tempTableName = "spark_13709_temp"
+
+ new File(path, tableName).mkdir()
+ new File(path, tempTableName).mkdir()
+
+ val avroSchema =
+ """{
+ | "name": "test_record",
+ | "type": "record",
+ | "fields": [ {
+ | "name": "f0",
+ | "type": "int"
+ | }, {
+ | "name": "f1",
+ | "type": {
+ | "type": "record",
+ | "name": "inner",
+ | "fields": [ {
+ | "name": "f10",
+ | "type": "int"
+ | }, {
+ | "name": "f11",
+ | "type": "double"
+ | } ]
+ | }
+ | } ]
+ |}
+ """.stripMargin
+
+ withTable(tableName, tempTableName) {
+ // Creates the external partitioned Avro table to be tested.
+ sql(
+ s"""CREATE EXTERNAL TABLE $tableName
+ |PARTITIONED BY (ds STRING)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+ |LOCATION '$path/$tableName'
+ |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+ """.stripMargin
+ )
+
+ // Creates an temporary Avro table used to prepare testing Avro file.
+ sql(
+ s"""CREATE EXTERNAL TABLE $tempTableName
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+ |LOCATION '$path/$tempTableName'
+ |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+ """.stripMargin
+ )
+
+ // Generates Avro data.
+ sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
+
+ // Adds generated Avro data as a new partition to the testing table.
+ sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
+
+ // The following query fails before SPARK-13709 is fixed. This is because when reading data
+ // from table partitions, Avro deserializer needs the Avro schema, which is defined in
+ // table property "avro.schema.literal". However, we only initializes the deserializer using
+ // partition properties, which doesn't include the wanted property entry. Merging two sets
+ // of properties solves the problem.
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Row(1, Row(2, 2.5D), "foo")
+ )
+ }
+ }
+ }
}