aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-17 15:47:59 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 15:47:59 -0800
commit117121a4ecaadda156a82255333670775e7727db (patch)
treed20cb0ce4d7e58623a5526a6e5fd749cdb1ee95a /sql/hive
parent4d4cc760fa9687ce563320094557ef9144488676 (diff)
downloadspark-117121a4ecaadda156a82255333670775e7727db.tar.gz
spark-117121a4ecaadda156a82255333670775e7727db.tar.bz2
spark-117121a4ecaadda156a82255333670775e7727db.zip
[SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.
The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception. This PR is based on #4562 from chenghao-intel. JIRA: https://issues.apache.org/jira/browse/SPARK-5852 Author: Yin Huai <yhuai@databricks.com> Author: Cheng Hao <hao.cheng@intel.com> Closes #4655 from yhuai/CTASParquet and squashes the following commits: b8b3450 [Yin Huai] Update tests. 2ac94f7 [Yin Huai] Update tests. 3db3d20 [Yin Huai] Minor update. d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala. 36978d1 [Cheng Hao] Update the code as feedback a04930b [Cheng Hao] fix bug of scan an empty parquet based table 442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala38
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala114
2 files changed, 150 insertions, 2 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 485d5c95bf..c30090fabb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.File
+
import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
@@ -30,6 +31,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.LogicalRelation
/**
* Tests for persisting tables created though the data sources API into the metastore.
@@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
}
+
+ if (HiveShim.version == "0.13.1") {
+ test("scan a parquet table created through a CTAS statement") {
+ val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
+ val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ setConf("spark.sql.hive.convertMetastoreParquet", "true")
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd).registerTempTable("jt")
+ sql(
+ """
+ |create table test_parquet_ctas STORED AS parquET
+ |AS select tmp.a from jt tmp where tmp.a < 5
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
+ Row(3) :: Row(4) :: Nil
+ )
+
+ table("test_parquet_ctas").queryExecution.analyzed match {
+ case LogicalRelation(p: ParquetRelation2) => // OK
+ case _ =>
+ fail(
+ s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ }
+
+ // Clenup and reset confs.
+ sql("DROP TABLE IF EXISTS jt")
+ sql("DROP TABLE IF EXISTS test_parquet_ctas")
+ setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 2acf1a7767..653f4b4736 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet
import java.io.File
-import org.apache.spark.sql.catalyst.expressions.Row
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-
+import org.apache.spark.sql.sources.LogicalRelation
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -121,13 +121,123 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
override def beforeAll(): Unit = {
super.beforeAll()
+
+ val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+ jsonRDD(rdd).registerTempTable("jt")
+
+ sql(
+ """
+ |create table test_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
}
override def afterAll(): Unit = {
super.afterAll()
+ sql("DROP TABLE IF EXISTS jt")
+ sql("DROP TABLE IF EXISTS test_parquet")
+
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("scan an empty parquet table") {
+ checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
+ }
+
+ test("scan an empty parquet table with upper case") {
+ checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
+ }
+
+ test("insert into an empty parquet table") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ // Insert into am empty table.
+ sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
+ Row(6, "str6") :: Row(7, "str7") :: Nil
+ )
+ // Insert overwrite.
+ sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+ Row(3, "str3") :: Row(4, "str4") :: Nil
+ )
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+
+ // Create it again.
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+ // Insert overwrite an empty table.
+ sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+ Row(3, "str3") :: Row(4, "str4") :: Nil
+ )
+ // Insert into the table.
+ sql("insert into table test_insert_parquet select a, b from jt")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet"),
+ (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
+ )
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ }
+
+ test("scan a parquet table created through a CTAS statement") {
+ sql(
+ """
+ |create table test_parquet_ctas ROW FORMAT
+ |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |AS select * from jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
+ Seq(Row(1, "str1"))
+ )
+
+ table("test_parquet_ctas").queryExecution.analyzed match {
+ case LogicalRelation(p: ParquetRelation2) => // OK
+ case _ =>
+ fail(
+ s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ }
+
+ sql("DROP TABLE IF EXISTS test_parquet_ctas")
+ }
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {