aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-01-24 20:40:27 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-24 20:40:27 +0800
commit3c86fdddf4bb1eac985654f80c3c716b7ae7540b (patch)
tree9be99b014de4f96cfbd7bd674e936492f8fbc456 /sql/hive
parentcca8680047bb2ec312ffc296a561abd5cbc8323c (diff)
downloadspark-3c86fdddf4bb1eac985654f80c3c716b7ae7540b.tar.gz
spark-3c86fdddf4bb1eac985654f80c3c716b7ae7540b.tar.bz2
spark-3c86fdddf4bb1eac985654f80c3c716b7ae7540b.zip
[SPARK-19152][SQL] DataFrameWriter.saveAsTable support hive append
## What changes were proposed in this pull request? After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support. This PR implement: DataFrameWriter.saveAsTable work with hive format with append mode ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16552 from windpiger/saveAsTableWithHiveAppend.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala36
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala17
7 files changed, 76 insertions, 31 deletions
diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 32aa13ff25..e7d762fbeb 100644
--- a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1,2 @@
org.apache.spark.sql.hive.orc.OrcFileFormat
+org.apache.spark.sql.hive.execution.HiveFileFormat \ No newline at end of file
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6cde783c5a..9a7111aa3b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -88,18 +88,11 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
- // Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
- // tables yet.
- if (mode == SaveMode.Append) {
- throw new AnalysisException(
- "CTAS for hive serde tables does not support append semantics.")
- }
-
val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
CreateHiveTableAsSelectCommand(
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query,
- mode == SaveMode.Ignore)
+ mode)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 0d30053937..2c754d7fbf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.command.RunnableCommand
@@ -31,13 +31,12 @@ import org.apache.spark.sql.hive.MetastoreRelation
*
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
- * @param ignoreIfExists allow continue working if it's already exists, otherwise
- * raise exception
+ * @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
- ignoreIfExists: Boolean)
+ mode: SaveMode)
extends RunnableCommand {
private val tableIdentifier = tableDesc.identifier
@@ -67,7 +66,7 @@ case class CreateHiveTableAsSelectCommand(
withFormat
}
- sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
+ sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
// Get the Metastore Relation
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
@@ -80,11 +79,18 @@ case class CreateHiveTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
- if (ignoreIfExists) {
- // table already exists, will do nothing, to keep consistent with Hive
- } else {
+ assert(mode != SaveMode.Overwrite,
+ s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
+
+ if (mode == SaveMode.ErrorIfExists) {
throw new AnalysisException(s"$tableIdentifier already exists.")
}
+ if (mode == SaveMode.Ignore) {
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty
+ }
+ sparkSession.sessionState.executePlan(InsertIntoTable(
+ metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
} else {
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index cc2b60bc41..ac735e8b38 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableJobConf
@@ -43,7 +44,13 @@ import org.apache.spark.util.SerializableJobConf
*
* TODO: implement the read logic.
*/
-class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
+class HiveFileFormat(fileSinkConf: FileSinkDesc)
+ extends FileFormat with DataSourceRegister with Logging {
+
+ def this() = this(null)
+
+ override def shortName(): String = "hive"
+
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f0e2c9369b..c262095df6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -419,12 +419,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value")
val df = sql(s"SELECT key, value FROM $tableName")
- val e = intercept[AnalysisException] {
- df.write.mode(SaveMode.Append).saveAsTable(tableName)
- }.getMessage
- assert(e.contains("Saving data in the Hive serde table default.tab1 is not supported " +
- "yet. Please use the insertInto() API as an alternative."))
-
df.write.insertInto(tableName)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
@@ -1167,8 +1161,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("create a temp view using hive") {
val tableName = "tab1"
- withTable (tableName) {
- val e = intercept[ClassNotFoundException] {
+ withTable(tableName) {
+ val e = intercept[AnalysisException] {
sql(
s"""
|CREATE TEMPORARY VIEW $tableName
@@ -1176,7 +1170,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|USING hive
""".stripMargin)
}.getMessage
- assert(e.contains("Failed to find data source: hive"))
+ assert(e.contains("Hive data source can only be used with tables, you can't use it with " +
+ "CREATE TEMP VIEW USING"))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index a77c68339d..2827183456 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1326,7 +1326,7 @@ class HiveDDLSuite
}
test("create hive serde table with DataFrameWriter.saveAsTable") {
- withTable("t", "t2") {
+ withTable("t", "t1") {
Seq(1 -> "a").toDF("i", "j")
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))
@@ -1357,11 +1357,8 @@ class HiveDDLSuite
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
- sql("INSERT INTO t SELECT 2, 'b'")
- checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
-
val e2 = intercept[AnalysisException] {
- Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
+ Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t1")
}
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
@@ -1372,6 +1369,35 @@ class HiveDDLSuite
}
}
+ test("append data to hive serde table") {
+ withTable("t", "t1") {
+ Seq(1 -> "a").toDF("i", "j")
+ .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row(1, "a"))
+
+ sql("INSERT INTO t SELECT 2, 'b'")
+ checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ Seq(3 -> "c").toDF("i", "j")
+ .write.format("hive").mode("append").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+ Seq("c" -> 3).toDF("i", "j")
+ .write.format("hive").mode("append").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
+ :: Row(null, "3") :: Nil)
+
+ Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
+
+ val e = intercept[AnalysisException] {
+ Seq(5 -> "e").toDF("i", "j")
+ .write.format("hive").mode("append").saveAsTable("t1")
+ }
+ assert(e.message.contains("The format of the existing table default.t1 is " +
+ "`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`."))
+ }
+ }
+
test("create partitioned hive serde table as select") {
withTable("t", "t1") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1a28c4c84a..20f30e48ab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1461,6 +1461,23 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
})
}
+ test("run sql directly on files - hive") {
+ withTempPath(f => {
+ spark.range(100).toDF.write.parquet(f.getCanonicalPath)
+
+ var e = intercept[AnalysisException] {
+ sql(s"select id from hive.`${f.getCanonicalPath}`")
+ }
+ assert(e.message.contains("Unsupported data source type for direct query on files: hive"))
+
+ // data source type is case insensitive
+ e = intercept[AnalysisException] {
+ sql(s"select id from HIVE.`${f.getCanonicalPath}`")
+ }
+ assert(e.message.contains("Unsupported data source type for direct query on files: HIVE"))
+ })
+ }
+
test("SPARK-8976 Wrong Result for Rollup #1") {
checkAnswer(sql(
"SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),