From 4c0ff5f58565f811b65f1a11b6121da007bcbd5f Mon Sep 17 00:00:00 2001 From: Xin Wu Date: Tue, 21 Mar 2017 08:49:54 -0700 Subject: [SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables ## What changes were proposed in this pull request? Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables. In this PR, we consider a few aspects: 1. View is not supported for `ALTER ADD COLUMNS` 2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use. 3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs. 4. For datasource table, this feature does not support the following: 4.1 TEXT format, since there is only one default column `value` is inferred for text format data. 4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files. 4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema. 4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported. 5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration. 6. This feature also supports In-Memory catalog, while Hive support is turned off. ## How was this patch tested? Add new test cases Author: Xin Wu Closes #16626 from xwu0226/alter_add_columns. --- .../spark/sql/hive/execution/HiveDDLSuite.scala | 100 ++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) (limited to 'sql/hive/src/test/scala/org/apache') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d752c415c1..04bc79d430 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.{MetadataBuilder, StructType} +import org.apache.spark.sql.types._ // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach { @@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { import testImplicits._ + val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO") override def afterEach(): Unit = { try { @@ -1860,4 +1861,101 @@ class HiveDDLSuite } } } + + hiveFormats.foreach { tableType => + test(s"alter hive serde table add columns -- partitioned - $tableType") { + withTable("tab") { + sql( + s""" + |CREATE TABLE tab (c1 int, c2 int) + |PARTITIONED BY (c3 int) STORED AS $tableType + """.stripMargin) + + sql("INSERT INTO tab PARTITION (c3=1) VALUES (1, 2)") + sql("ALTER TABLE tab ADD COLUMNS (c4 int)") + + checkAnswer( + sql("SELECT * FROM tab WHERE c3 = 1"), + Seq(Row(1, 2, null, 1)) + ) + assert(spark.table("tab").schema + .contains(StructField("c4", IntegerType))) + sql("INSERT INTO tab PARTITION (c3=2) VALUES (2, 3, 4)") + checkAnswer( + spark.table("tab"), + Seq(Row(1, 2, null, 1), Row(2, 3, 4, 2)) + ) + checkAnswer( + sql("SELECT * FROM tab WHERE c3 = 2 AND c4 IS NOT NULL"), + Seq(Row(2, 3, 4, 2)) + ) + + sql("ALTER TABLE tab ADD COLUMNS (c5 char(10))") + assert(spark.table("tab").schema.find(_.name == "c5") + .get.metadata.getString("HIVE_TYPE_STRING") == "char(10)") + } + } + } + + hiveFormats.foreach { tableType => + test(s"alter hive serde table add columns -- with predicate - $tableType ") { + withTable("tab") { + sql(s"CREATE TABLE tab (c1 int, c2 int) STORED AS $tableType") + sql("INSERT INTO tab VALUES (1, 2)") + sql("ALTER TABLE tab ADD COLUMNS (c4 int)") + checkAnswer( + sql("SELECT * FROM tab WHERE c4 IS NULL"), + Seq(Row(1, 2, null)) + ) + assert(spark.table("tab").schema + .contains(StructField("c4", IntegerType))) + sql("INSERT INTO tab VALUES (2, 3, 4)") + checkAnswer( + sql("SELECT * FROM tab WHERE c4 = 4 "), + Seq(Row(2, 3, 4)) + ) + checkAnswer( + spark.table("tab"), + Seq(Row(1, 2, null), Row(2, 3, 4)) + ) + } + } + } + + Seq(true, false).foreach { caseSensitive => + test(s"alter add columns with existing column name - caseSensitive $caseSensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { + withTable("tab") { + sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET") + if (!caseSensitive) { + // duplicating partitioning column name + val e1 = intercept[AnalysisException] { + sql("ALTER TABLE tab ADD COLUMNS (C2 string)") + }.getMessage + assert(e1.contains("Found duplicate column(s)")) + + // duplicating data column name + val e2 = intercept[AnalysisException] { + sql("ALTER TABLE tab ADD COLUMNS (C1 string)") + }.getMessage + assert(e2.contains("Found duplicate column(s)")) + } else { + // hive catalog will still complains that c1 is duplicate column name because hive + // identifiers are case insensitive. + val e1 = intercept[AnalysisException] { + sql("ALTER TABLE tab ADD COLUMNS (C2 string)") + }.getMessage + assert(e1.contains("HiveException")) + + // hive catalog will still complains that c1 is duplicate column name because hive + // identifiers are case insensitive. + val e2 = intercept[AnalysisException] { + sql("ALTER TABLE tab ADD COLUMNS (C1 string)") + }.getMessage + assert(e2.contains("HiveException")) + } + } + } + } + } } -- cgit v1.2.3