From 117121a4ecaadda156a82255333670775e7727db Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 15:47:59 -0800 Subject: [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table. The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception. This PR is based on #4562 from chenghao-intel. JIRA: https://issues.apache.org/jira/browse/SPARK-5852 Author: Yin Huai Author: Cheng Hao Closes #4655 from yhuai/CTASParquet and squashes the following commits: b8b3450 [Yin Huai] Update tests. 2ac94f7 [Yin Huai] Update tests. 3db3d20 [Yin Huai] Minor update. d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala. 36978d1 [Cheng Hao] Update the code as feedback a04930b [Cheng Hao] fix bug of scan an empty parquet based table 442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext --- .../org/apache/spark/sql/parquet/newParquet.scala | 18 +++- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 38 +++++++ .../apache/spark/sql/parquet/parquetSuites.scala | 114 ++++++++++++++++++++- 3 files changed, 164 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 95bea92011..16b771344b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2( } } - parquetSchema = maybeSchema.getOrElse(readSchema()) + // To get the schema. We first try to get the schema defined in maybeSchema. + // If maybeSchema is not defined, we will try to get the schema from existing parquet data + // (through readSchema). If data does not exist, we will try to get the schema defined in + // maybeMetastoreSchema (defined in the options of the data source). + // Finally, if we still could not get the schema. We throw an error. + parquetSchema = + maybeSchema + .orElse(readSchema()) + .orElse(maybeMetastoreSchema) + .getOrElse(sys.error("Failed to get the schema.")) partitionKeysIncludedInParquetSchema = isPartitioned && @@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2( } } - private def readSchema(): StructType = { + private def readSchema(): Option[StructType] = { // Sees which file(s) we need to touch in order to figure out the schema. val filesToTouch = // Always tries the summary files first if users don't require a merged schema. In this case, @@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" - private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema @@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } - }.reduce { (left, right) => + }.reduceOption { (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 485d5c95bf..c30090fabb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File + import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils @@ -30,6 +31,8 @@ import org.apache.spark.util.Utils import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.sources.LogicalRelation /** * Tests for persisting tables created though the data sources API into the metastore. @@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE savedJsonTable") conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) } + + if (HiveShim.version == "0.13.1") { + test("scan a parquet table created through a CTAS statement") { + val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true") + val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + setConf("spark.sql.hive.convertMetastoreParquet", "true") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql( + """ + |create table test_parquet_ctas STORED AS parquET + |AS select tmp.a from jt tmp where tmp.a < 5 + """.stripMargin) + + checkAnswer( + sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "), + Row(3) :: Row(4) :: Nil + ) + + table("test_parquet_ctas").queryExecution.analyzed match { + case LogicalRelation(p: ParquetRelation2) => // OK + case _ => + fail( + s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + } + + // Clenup and reset confs. + sql("DROP TABLE IF EXISTS jt") + sql("DROP TABLE IF EXISTS test_parquet_ctas") + setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore) + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 2acf1a7767..653f4b4736 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet import java.io.File -import org.apache.spark.sql.catalyst.expressions.Row import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{SQLConf, QueryTest} +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.PhysicalRDD import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ - +import org.apache.spark.sql.sources.LogicalRelation // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) @@ -121,13 +121,123 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + + sql( + """ + |create table test_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") } override def afterAll(): Unit = { super.afterAll() + sql("DROP TABLE IF EXISTS jt") + sql("DROP TABLE IF EXISTS test_parquet") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test("scan an empty parquet table") { + checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) + } + + test("scan an empty parquet table with upper case") { + checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) + } + + test("insert into an empty parquet table") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + // Insert into am empty table. + sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"), + Row(6, "str6") :: Row(7, "str7") :: Nil + ) + // Insert overwrite. + sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"), + Row(3, "str3") :: Row(4, "str4") :: Nil + ) + sql("DROP TABLE IF EXISTS test_insert_parquet") + + // Create it again. + sql( + """ + |create table test_insert_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + // Insert overwrite an empty table. + sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"), + Row(3, "str3") :: Row(4, "str4") :: Nil + ) + // Insert into the table. + sql("insert into table test_insert_parquet select a, b from jt") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet"), + (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i")) + ) + sql("DROP TABLE IF EXISTS test_insert_parquet") + } + + test("scan a parquet table created through a CTAS statement") { + sql( + """ + |create table test_parquet_ctas ROW FORMAT + |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |AS select * from jt + """.stripMargin) + + checkAnswer( + sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"), + Seq(Row(1, "str1")) + ) + + table("test_parquet_ctas").queryExecution.analyzed match { + case LogicalRelation(p: ParquetRelation2) => // OK + case _ => + fail( + s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + } + + sql("DROP TABLE IF EXISTS test_parquet_ctas") + } } class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { -- cgit v1.2.3