aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala72
2 files changed, 81 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 865e406ce2..4918780873 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -197,6 +197,15 @@ case class CreateDataSourceTableAsSelectCommand(
EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+ // check if the file formats match
+ l.relation match {
+ case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
+ throw new AnalysisException(
+ s"The file format of the existing table $tableIdent is " +
+ s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
+ s"format `$provider`")
+ case _ =>
+ }
if (query.schema.size != l.schema.size) {
throw new AnalysisException(
s"The column number of the existing schema[${l.schema}] " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 3d8123d3c0..b028d49aff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -891,6 +891,78 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
+ test("append table using different formats") {
+ def createDF(from: Int, to: Int): DataFrame = {
+ (from to to).map(i => i -> s"str$i").toDF("c1", "c2")
+ }
+
+ withTable("appendOrcToParquet") {
+ createDF(0, 9).write.format("parquet").saveAsTable("appendOrcToParquet")
+ val e = intercept[AnalysisException] {
+ createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
+ }
+ assert(e.getMessage.contains("The file format of the existing table `appendOrcToParquet` " +
+ "is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " +
+ "It doesn't match the specified format `orc`"))
+ }
+
+ withTable("appendParquetToJson") {
+ createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson")
+ val e = intercept[AnalysisException] {
+ createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
+ .saveAsTable("appendParquetToJson")
+ }
+ assert(e.getMessage.contains("The file format of the existing table `appendParquetToJson` " +
+ "is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
+ "It doesn't match the specified format `parquet`"))
+ }
+
+ withTable("appendTextToJson") {
+ createDF(0, 9).write.format("json").saveAsTable("appendTextToJson")
+ val e = intercept[AnalysisException] {
+ createDF(10, 19).write.mode(SaveMode.Append).format("text")
+ .saveAsTable("appendTextToJson")
+ }
+ assert(e.getMessage.contains("The file format of the existing table `appendTextToJson` is " +
+ "`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
+ "It doesn't match the specified format `text`"))
+ }
+ }
+
+ test("append a table using the same formats but different names") {
+ def createDF(from: Int, to: Int): DataFrame = {
+ (from to to).map(i => i -> s"str$i").toDF("c1", "c2")
+ }
+
+ withTable("appendParquet") {
+ createDF(0, 9).write.format("parquet").saveAsTable("appendParquet")
+ createDF(10, 19).write.mode(SaveMode.Append).format("org.apache.spark.sql.parquet")
+ .saveAsTable("appendParquet")
+ checkAnswer(
+ sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"),
+ (6 to 19).map(i => Row(i, s"str$i")))
+ }
+
+ withTable("appendParquet") {
+ createDF(0, 9).write.format("org.apache.spark.sql.parquet").saveAsTable("appendParquet")
+ createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("appendParquet")
+ checkAnswer(
+ sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"),
+ (6 to 19).map(i => Row(i, s"str$i")))
+ }
+
+ withTable("appendParquet") {
+ createDF(0, 9).write.format("org.apache.spark.sql.parquet.DefaultSource")
+ .saveAsTable("appendParquet")
+ createDF(10, 19).write.mode(SaveMode.Append)
+ .format("org.apache.spark.sql.execution.datasources.parquet.DefaultSource")
+ .saveAsTable("appendParquet")
+ checkAnswer(
+ sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"),
+ (6 to 19).map(i => Row(i, s"str$i")))
+ }
+ }
+
test("SPARK-8156:create table to specific database by 'use dbname' ") {
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")