aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-01-22 11:41:27 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-22 11:41:27 +0800
commitaa014eb74bec332ca4d734f2501a4a01a806fa37 (patch)
tree36b8540260344fb181dc0127d858920a06ff5493 /sql/hive
parent6113fe78a5195d3325690703b20000bed6e9efa5 (diff)
downloadspark-aa014eb74bec332ca4d734f2501a4a01a806fa37.tar.gz
spark-aa014eb74bec332ca4d734f2501a4a01a806fa37.tar.bz2
spark-aa014eb74bec332ca4d734f2501a4a01a806fa37.zip
[SPARK-19153][SQL] DataFrameWriter.saveAsTable work with create partitioned table
## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19153), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. this PR provide DataFrameWriter.saveAsTable work with hive format to create partitioned table. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16593 from windpiger/saveAsTableWithPartitionedTable.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala33
3 files changed, 26 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b649612a40..838e6f4008 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -40,14 +40,6 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
if (t.bucketSpec.isDefined) {
throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
}
- if (t.partitionColumnNames.nonEmpty && query.isDefined) {
- val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
- "create a partitioned table using Hive's file formats. " +
- "Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
- "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
- "CTAS statement."
- throw new AnalysisException(errorMessage)
- }
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
val options = new HiveOptions(t.storage.properties)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ccc2d64c4a..0d30053937 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -62,9 +62,7 @@ case class CreateHiveTableAsSelectCommand(
compressed = tableDesc.storage.compressed)
val withSchema = if (withFormat.schema.isEmpty) {
- // Hive doesn't support specifying the column list for target table in CTAS
- // However we don't think SparkSQL should follow that.
- tableDesc.copy(schema = query.output.toStructType)
+ tableDesc.copy(schema = query.schema)
} else {
withFormat
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 235fbd3fc6..41917ccabc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1353,12 +1353,6 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
- val e = intercept[AnalysisException] {
- Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
- }
- assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
- "to create a partitioned table using Hive"))
-
val e2 = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
}
@@ -1371,6 +1365,22 @@ class HiveDDLSuite
}
}
+ test("create partitioned hive serde table as select") {
+ withTable("t", "t1") {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ Seq(10 -> "y").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row("y", 10) :: Nil)
+
+ Seq((1, 2, 3)).toDF("i", "j", "k").write.mode("overwrite").format("hive")
+ .partitionBy("j", "k").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row(1, 2, 3) :: Nil)
+
+ spark.sql("create table t1 using hive partitioned by (i) as select 1 as i, 'a' as j")
+ checkAnswer(spark.table("t1"), Row("a", 1) :: Nil)
+ }
+ }
+ }
+
test("read/write files with hive data source is not allowed") {
withTempDir { dir =>
val e = intercept[AnalysisException] {
@@ -1390,7 +1400,7 @@ class HiveDDLSuite
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
}
- withTable("t", "t1", "t2", "t3", "t4") {
+ withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") {
sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
assert(getTableColumns("t") == Seq("a", "c", "d", "b"))
@@ -1411,7 +1421,14 @@ class HiveDDLSuite
sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)")
assert(getTableColumns("t4") == Seq("a", "c", "d", "b"))
- // TODO: add test for creating partitioned hive serde table as select, once we support it.
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ sql("CREATE TABLE t5 USING hive PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
+ assert(getTableColumns("t5") == Seq("a", "c", "d", "b"))
+
+ Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.format("hive")
+ .partitionBy("d", "b").saveAsTable("t6")
+ assert(getTableColumns("t6") == Seq("a", "c", "d", "b"))
+ }
}
}
}