From aa014eb74bec332ca4d734f2501a4a01a806fa37 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 22 Jan 2017 11:41:27 +0800 Subject: [SPARK-19153][SQL] DataFrameWriter.saveAsTable work with create partitioned table ## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19153), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. this PR provide DataFrameWriter.saveAsTable work with hive format to create partitioned table. ## How was this patch tested? unit test added Author: windpiger Closes #16593 from windpiger/saveAsTableWithPartitionedTable. --- .../spark/sql/hive/execution/HiveDDLSuite.scala | 33 ++++++++++++++++------ 1 file changed, 25 insertions(+), 8 deletions(-) (limited to 'sql/hive/src/test/scala/org') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 235fbd3fc6..41917ccabc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1353,12 +1353,6 @@ class HiveDDLSuite sql("INSERT INTO t SELECT 2, 'b'") checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil) - val e = intercept[AnalysisException] { - Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2") - } - assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " + - "to create a partitioned table using Hive")) - val e2 = intercept[AnalysisException] { Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2") } @@ -1371,6 +1365,22 @@ class HiveDDLSuite } } + test("create partitioned hive serde table as select") { + withTable("t", "t1") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + Seq(10 -> "y").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t") + checkAnswer(spark.table("t"), Row("y", 10) :: Nil) + + Seq((1, 2, 3)).toDF("i", "j", "k").write.mode("overwrite").format("hive") + .partitionBy("j", "k").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, 2, 3) :: Nil) + + spark.sql("create table t1 using hive partitioned by (i) as select 1 as i, 'a' as j") + checkAnswer(spark.table("t1"), Row("a", 1) :: Nil) + } + } + } + test("read/write files with hive data source is not allowed") { withTempDir { dir => val e = intercept[AnalysisException] { @@ -1390,7 +1400,7 @@ class HiveDDLSuite spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name) } - withTable("t", "t1", "t2", "t3", "t4") { + withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") { sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)") assert(getTableColumns("t") == Seq("a", "c", "d", "b")) @@ -1411,7 +1421,14 @@ class HiveDDLSuite sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)") assert(getTableColumns("t4") == Seq("a", "c", "d", "b")) - // TODO: add test for creating partitioned hive serde table as select, once we support it. + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql("CREATE TABLE t5 USING hive PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d") + assert(getTableColumns("t5") == Seq("a", "c", "d", "b")) + + Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.format("hive") + .partitionBy("d", "b").saveAsTable("t6") + assert(getTableColumns("t6") == Seq("a", "c", "d", "b")) + } } } } -- cgit v1.2.3