From b0319c2ecb51bb97c3228afa4a384572b9ffbce6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Jan 2017 19:26:51 +0800 Subject: [SPARK-19107][SQL] support creating hive table with DataFrameWriter and Catalog ## What changes were proposed in this pull request? After unifying the CREATE TABLE syntax in https://github.com/apache/spark/pull/16296, it's pretty easy to support creating hive table with `DataFrameWriter` and `Catalog` now. This PR basically just removes the hive provider check in `DataFrameWriter.saveAsTable` and `Catalog.createExternalTable`, and add tests. ## How was this patch tested? new tests in `HiveDDLSuite` Author: Wenchen Fan Closes #16487 from cloud-fan/hive-table. --- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 20 ------ .../spark/sql/hive/execution/HiveDDLSuite.scala | 77 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 20 deletions(-) (limited to 'sql/hive/src/test/scala/org') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index aed825e2f3..13ef79e3b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("save API - format hive") { - withTempDir { dir => - val path = dir.getCanonicalPath - val e = intercept[ClassNotFoundException] { - spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path) - }.getMessage - assert(e.contains("Failed to find data source: hive")) - } - } - - test("saveAsTable API - format hive") { - val tableName = "tab1" - withTable(tableName) { - val e = intercept[AnalysisException] { - spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName) - }.getMessage - assert(e.contains("Cannot create hive serde table with saveAsTable API")) - } - } - test("create a temp view using hive") { val tableName = "tab1" withTable (tableName) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3ac07d0933..77285282a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -1289,4 +1290,80 @@ class HiveDDLSuite } } } + + test("create hive serde table with Catalog") { + withTable("t") { + withTempDir { dir => + val df = spark.catalog.createExternalTable( + "t", + "hive", + new StructType().add("i", "int"), + Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet")) + assert(df.collect().isEmpty) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + } + } + } + + test("create hive serde table with DataFrameWriter.saveAsTable") { + withTable("t", "t2") { + Seq(1 -> "a").toDF("i", "j") + .write.format("hive").option("fileFormat", "avro").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, "a")) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) + + sql("INSERT INTO t SELECT 2, 'b'") + checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) + + val e = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2") + } + assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " + + "to create a partitioned table using Hive")) + + val e2 = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2") + } + assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) + + val e3 = intercept[AnalysisException] { + spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t") + } + assert(e3.message.contains( + "CTAS for hive serde tables does not support append or overwrite semantics")) + } + } + + test("read/write files with hive data source is not allowed") { + withTempDir { dir => + val e = intercept[AnalysisException] { + spark.read.format("hive").load(dir.getAbsolutePath) + } + assert(e.message.contains("Hive data source can only be used with tables")) + + val e2 = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath) + } + assert(e2.message.contains("Hive data source can only be used with tables")) + } + } } -- cgit v1.2.3