aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g43
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala56
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala122
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala100
8 files changed, 400 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index cc3b8fd3b4..c4a590ec69 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -85,6 +85,8 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq)? #analyze
+ | ALTER TABLE tableIdentifier
+ ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
@@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
- | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
| kw1=COMMIT
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b134fd44a3..a469d12451 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.{StructField, StructType}
object SessionCatalog {
val DEFAULT_DATABASE = "default"
@@ -161,6 +162,20 @@ class SessionCatalog(
throw new TableAlreadyExistsException(db = db, table = name.table)
}
}
+
+ private def checkDuplication(fields: Seq[StructField]): Unit = {
+ val columnNames = if (conf.caseSensitiveAnalysis) {
+ fields.map(_.name)
+ } else {
+ fields.map(_.name.toLowerCase)
+ }
+ if (columnNames.distinct.length != columnNames.length) {
+ val duplicateColumns = columnNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => x
+ }
+ throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
+ }
+ }
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -296,6 +311,47 @@ class SessionCatalog(
}
/**
+ * Alter the schema of a table identified by the provided table identifier. The new schema
+ * should still contain the existing bucket columns and partition columns used by the table. This
+ * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+ * as the schema itself).
+ *
+ * @param identifier TableIdentifier
+ * @param newSchema Updated schema to be used for the table (must contain existing partition and
+ * bucket columns, and partition columns need to be at the end)
+ */
+ def alterTableSchema(
+ identifier: TableIdentifier,
+ newSchema: StructType): Unit = {
+ val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
+ val table = formatTableName(identifier.table)
+ val tableIdentifier = TableIdentifier(table, Some(db))
+ requireDbExists(db)
+ requireTableExists(tableIdentifier)
+ checkDuplication(newSchema)
+
+ val catalogTable = externalCatalog.getTable(db, table)
+ val oldSchema = catalogTable.schema
+
+ // not supporting dropping columns yet
+ val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+ if (nonExistentColumnNames.nonEmpty) {
+ throw new AnalysisException(
+ s"""
+ |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
+ |not present in the new schema. We don't support dropping columns yet.
+ """.stripMargin)
+ }
+
+ // assuming the newSchema has all partition columns at the end as required
+ externalCatalog.alterTableSchema(db, table, newSchema)
+ }
+
+ private def columnNameResolved(schema: StructType, colName: String): Boolean = {
+ schema.fields.map(_.name).exists(conf.resolver(_, colName))
+ }
+
+ /**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fd9e5d6bb1..ca4ce1c117 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
+import org.apache.spark.sql.types._
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
@@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
+ test("alter table add columns") {
+ withBasicCatalog { sessionCatalog =>
+ sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+ val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ sessionCatalog.alterTableSchema(
+ TableIdentifier("t1", Some("default")),
+ StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
+
+ val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ // construct the expected table schema
+ val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
+ Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
+ assert(newTab.schema == expectedTableSchema)
+ }
+ }
+
+ test("alter table drop columns") {
+ withBasicCatalog { sessionCatalog =>
+ sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+ val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ val e = intercept[AnalysisException] {
+ sessionCatalog.alterTableSchema(
+ TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
+ }.getMessage
+ assert(e.contains("We don't support dropping columns yet."))
+ }
+ }
+
test("get table") {
withBasicCatalog { catalog =>
assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index abea7a3bcf..d4f23f9dd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -742,6 +742,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[AlterTableAddColumnsCommand]] command.
+ *
+ * For example:
+ * {{{
+ * ALTER TABLE table1
+ * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+ */
+ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
+ AlterTableAddColumnsCommand(
+ visitTableIdentifier(ctx.tableIdentifier),
+ visitColTypeList(ctx.columns)
+ )
+ }
+
+ /**
* Create an [[AlterTableSetPropertiesCommand]] command.
*
* For example:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index beb3dcafd6..93307fc883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -175,6 +178,77 @@ case class AlterTableRenameCommand(
}
/**
+ * A command that add columns to a table
+ * The syntax of using this command in SQL is:
+ * {{{
+ * ALTER TABLE table_identifier
+ * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+*/
+case class AlterTableAddColumnsCommand(
+ table: TableIdentifier,
+ columns: Seq[StructField]) extends RunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val catalogTable = verifyAlterTableAddColumn(catalog, table)
+
+ try {
+ sparkSession.catalog.uncacheTable(table.quotedString)
+ } catch {
+ case NonFatal(e) =>
+ log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
+ }
+ catalog.refreshTable(table)
+
+ // make sure any partition columns are at the end of the fields
+ val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+ catalog.alterTableSchema(
+ table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
+
+ Seq.empty[Row]
+ }
+
+ /**
+ * ALTER TABLE ADD COLUMNS command does not support temporary view/table,
+ * view, or datasource table with text, orc formats or external provider.
+ * For datasource table, it currently only supports parquet, json, csv.
+ */
+ private def verifyAlterTableAddColumn(
+ catalog: SessionCatalog,
+ table: TableIdentifier): CatalogTable = {
+ val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+
+ if (catalogTable.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ s"""
+ |ALTER ADD COLUMNS does not support views.
+ |You must drop and re-create the views for adding the new columns. Views: $table
+ """.stripMargin)
+ }
+
+ if (DDLUtils.isDatasourceTable(catalogTable)) {
+ DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
+ // For datasource table, this command can only support the following File format.
+ // TextFileFormat only default to one column "value"
+ // OrcFileFormat can not handle difference between user-specified schema and
+ // inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
+ // Hive type is already considered as hive serde table, so the logic will not
+ // come in here.
+ case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
+ case s =>
+ throw new AnalysisException(
+ s"""
+ |ALTER ADD COLUMNS does not support datasource table with type $s.
+ |You must drop and re-create the table for adding the new columns. Tables: $table
+ """.stripMargin)
+ }
+ }
+ catalogTable
+ }
+}
+
+
+/**
* A command that loads data into a Hive table.
*
* The syntax of this command is:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 4b73b078da..13202a5785 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
}
- test("alter table: add/replace columns (not allowed)") {
- assertUnsupported(
- """
- |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
- |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
- |COMMENT 'test_comment2') CASCADE
- """.stripMargin)
+ test("alter table: replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 235c6bf6ad..648b1798c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
+
+ supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+ test(s"alter datasource table add columns - $provider") {
+ withTable("t1") {
+ sql(s"CREATE TABLE t1 (c1 int) USING $provider")
+ sql("INSERT INTO t1 VALUES (1)")
+ sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(Row(1, null))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 is null"),
+ Seq(Row(1, null))
+ )
+
+ sql("INSERT INTO t1 VALUES (3, 2)")
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 = 2"),
+ Seq(Row(3, 2))
+ )
+ }
+ }
+ }
+
+ supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+ test(s"alter datasource table add columns - partitioned - $provider") {
+ withTable("t1") {
+ sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
+ sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
+ sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(Row(1, null, 2))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c3 is null"),
+ Seq(Row(1, null, 2))
+ )
+ sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c3 = 3"),
+ Seq(Row(2, 3, 1))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 = 1"),
+ Seq(Row(2, 3, 1))
+ )
+ }
+ }
+ }
+
+ test("alter datasource table add columns - text format not supported") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING text")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+ }.getMessage
+ assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
+ }
+ }
+
+ test("alter table add columns -- not support temp view") {
+ withTempView("tmp_v") {
+ sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
+ }
+ assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+ }
+ }
+
+ test("alter table add columns -- not support view") {
+ withView("v1") {
+ sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
+ }
+ assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+ }
+ }
+
+ test("alter table add columns with existing column name") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
+ }.getMessage
+ assert(e.contains("Found duplicate column(s)"))
+ }
+ }
+
+ Seq(true, false).foreach { caseSensitive =>
+ test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+ if (!caseSensitive) {
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e.contains("Found duplicate column(s)"))
+ } else {
+ if (isUsingHiveMetastore) {
+ // hive catalog will still complains that c1 is duplicate column name because hive
+ // identifiers are case insensitive.
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e.contains("HiveException"))
+ } else {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ assert(spark.table("t1").schema
+ .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d752c415c1..04bc79d430 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+import org.apache.spark.sql.types._
// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
@@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import testImplicits._
+ val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
override def afterEach(): Unit = {
try {
@@ -1860,4 +1861,101 @@ class HiveDDLSuite
}
}
}
+
+ hiveFormats.foreach { tableType =>
+ test(s"alter hive serde table add columns -- partitioned - $tableType") {
+ withTable("tab") {
+ sql(
+ s"""
+ |CREATE TABLE tab (c1 int, c2 int)
+ |PARTITIONED BY (c3 int) STORED AS $tableType
+ """.stripMargin)
+
+ sql("INSERT INTO tab PARTITION (c3=1) VALUES (1, 2)")
+ sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
+
+ checkAnswer(
+ sql("SELECT * FROM tab WHERE c3 = 1"),
+ Seq(Row(1, 2, null, 1))
+ )
+ assert(spark.table("tab").schema
+ .contains(StructField("c4", IntegerType)))
+ sql("INSERT INTO tab PARTITION (c3=2) VALUES (2, 3, 4)")
+ checkAnswer(
+ spark.table("tab"),
+ Seq(Row(1, 2, null, 1), Row(2, 3, 4, 2))
+ )
+ checkAnswer(
+ sql("SELECT * FROM tab WHERE c3 = 2 AND c4 IS NOT NULL"),
+ Seq(Row(2, 3, 4, 2))
+ )
+
+ sql("ALTER TABLE tab ADD COLUMNS (c5 char(10))")
+ assert(spark.table("tab").schema.find(_.name == "c5")
+ .get.metadata.getString("HIVE_TYPE_STRING") == "char(10)")
+ }
+ }
+ }
+
+ hiveFormats.foreach { tableType =>
+ test(s"alter hive serde table add columns -- with predicate - $tableType ") {
+ withTable("tab") {
+ sql(s"CREATE TABLE tab (c1 int, c2 int) STORED AS $tableType")
+ sql("INSERT INTO tab VALUES (1, 2)")
+ sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
+ checkAnswer(
+ sql("SELECT * FROM tab WHERE c4 IS NULL"),
+ Seq(Row(1, 2, null))
+ )
+ assert(spark.table("tab").schema
+ .contains(StructField("c4", IntegerType)))
+ sql("INSERT INTO tab VALUES (2, 3, 4)")
+ checkAnswer(
+ sql("SELECT * FROM tab WHERE c4 = 4 "),
+ Seq(Row(2, 3, 4))
+ )
+ checkAnswer(
+ spark.table("tab"),
+ Seq(Row(1, 2, null), Row(2, 3, 4))
+ )
+ }
+ }
+ }
+
+ Seq(true, false).foreach { caseSensitive =>
+ test(s"alter add columns with existing column name - caseSensitive $caseSensitive") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+ withTable("tab") {
+ sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET")
+ if (!caseSensitive) {
+ // duplicating partitioning column name
+ val e1 = intercept[AnalysisException] {
+ sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
+ }.getMessage
+ assert(e1.contains("Found duplicate column(s)"))
+
+ // duplicating data column name
+ val e2 = intercept[AnalysisException] {
+ sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e2.contains("Found duplicate column(s)"))
+ } else {
+ // hive catalog will still complains that c1 is duplicate column name because hive
+ // identifiers are case insensitive.
+ val e1 = intercept[AnalysisException] {
+ sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
+ }.getMessage
+ assert(e1.contains("HiveException"))
+
+ // hive catalog will still complains that c1 is duplicate column name because hive
+ // identifiers are case insensitive.
+ val e2 = intercept[AnalysisException] {
+ sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e2.contains("HiveException"))
+ }
+ }
+ }
+ }
+ }
}