aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala38
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala114
3 files changed, 164 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 95bea92011..16b771344b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2(
}
}
- parquetSchema = maybeSchema.getOrElse(readSchema())
+ // To get the schema. We first try to get the schema defined in maybeSchema.
+ // If maybeSchema is not defined, we will try to get the schema from existing parquet data
+ // (through readSchema). If data does not exist, we will try to get the schema defined in
+ // maybeMetastoreSchema (defined in the options of the data source).
+ // Finally, if we still could not get the schema. We throw an error.
+ parquetSchema =
+ maybeSchema
+ .orElse(readSchema())
+ .orElse(maybeMetastoreSchema)
+ .getOrElse(sys.error("Failed to get the schema."))
partitionKeysIncludedInParquetSchema =
isPartitioned &&
@@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2(
}
}
- private def readSchema(): StructType = {
+ private def readSchema(): Option[StructType] = {
// Sees which file(s) we need to touch in order to figure out the schema.
val filesToTouch =
// Always tries the summary files first if users don't require a merged schema. In this case,
@@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
- private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
+ private[parquet] def readSchema(
+ footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
val parquetSchema = metadata.getSchema
@@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 {
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
}
- }.reduce { (left, right) =>
+ }.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 485d5c95bf..c30090fabb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.File
+
import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
@@ -30,6 +31,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.LogicalRelation
/**
* Tests for persisting tables created though the data sources API into the metastore.
@@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
}
+
+ if (HiveShim.version == "0.13.1") {
+ test("scan a parquet table created through a CTAS statement") {
+ val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
+ val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ setConf("spark.sql.hive.convertMetastoreParquet", "true")
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd).registerTempTable("jt")
+ sql(
+ """
+ |create table test_parquet_ctas STORED AS parquET
+ |AS select tmp.a from jt tmp where tmp.a < 5
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
+ Row(3) :: Row(4) :: Nil
+ )
+
+ table("test_parquet_ctas").queryExecution.analyzed match {
+ case LogicalRelation(p: ParquetRelation2) => // OK
+ case _ =>
+ fail(
+ s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ }
+
+ // Clenup and reset confs.
+ sql("DROP TABLE IF EXISTS jt")
+ sql("DROP TABLE IF EXISTS test_parquet_ctas")
+ setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 2acf1a7767..653f4b4736 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet
import java.io.File
-import org.apache.spark.sql.catalyst.expressions.Row
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-
+import org.apache.spark.sql.sources.LogicalRelation
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -121,13 +121,123 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
override def beforeAll(): Unit = {
super.beforeAll()
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd).registerTempTable("jt")
+
+ sql(
+ """
+ |create table test_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
}
override def afterAll(): Unit = {
super.afterAll()
+ sql("DROP TABLE IF EXISTS jt")
+ sql("DROP TABLE IF EXISTS test_parquet")
+
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("scan an empty parquet table") {
+ checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
+ }
+
+ test("scan an empty parquet table with upper case") {
+ checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
+ }
+
+ test("insert into an empty parquet table") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ // Insert into am empty table.
+ sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
+ Row(6, "str6") :: Row(7, "str7") :: Nil
+ )
+ // Insert overwrite.
+ sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+ Row(3, "str3") :: Row(4, "str4") :: Nil
+ )
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+
+ // Create it again.
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+ // Insert overwrite an empty table.
+ sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+ Row(3, "str3") :: Row(4, "str4") :: Nil
+ )
+ // Insert into the table.
+ sql("insert into table test_insert_parquet select a, b from jt")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet"),
+ (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
+ )
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ }
+
+ test("scan a parquet table created through a CTAS statement") {
+ sql(
+ """
+ |create table test_parquet_ctas ROW FORMAT
+ |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |AS select * from jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
+ Seq(Row(1, "str1"))
+ )
+
+ table("test_parquet_ctas").queryExecution.analyzed match {
+ case LogicalRelation(p: ParquetRelation2) => // OK
+ case _ =>
+ fail(
+ s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ }
+
+ sql("DROP TABLE IF EXISTS test_parquet_ctas")
+ }
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {