diff options
Diffstat (limited to 'sql/core')
4 files changed, 214 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index abea7a3bcf..d4f23f9dd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -742,6 +742,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create a [[AlterTableAddColumnsCommand]] command. + * + * For example: + * {{{ + * ALTER TABLE table1 + * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...); + * }}} + */ + override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) { + AlterTableAddColumnsCommand( + visitTableIdentifier(ctx.tableIdentifier), + visitColTypeList(ctx.columns) + ) + } + + /** * Create an [[AlterTableSetPropertiesCommand]] command. * * For example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index beb3dcafd6..93307fc883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -175,6 +178,77 @@ case class AlterTableRenameCommand( } /** + * A command that add columns to a table + * The syntax of using this command in SQL is: + * {{{ + * ALTER TABLE table_identifier + * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...); + * }}} +*/ +case class AlterTableAddColumnsCommand( + table: TableIdentifier, + columns: Seq[StructField]) extends RunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val catalogTable = verifyAlterTableAddColumn(catalog, table) + + try { + sparkSession.catalog.uncacheTable(table.quotedString) + } catch { + case NonFatal(e) => + log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e) + } + catalog.refreshTable(table) + + // make sure any partition columns are at the end of the fields + val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema + catalog.alterTableSchema( + table, catalogTable.schema.copy(fields = reorderedSchema.toArray)) + + Seq.empty[Row] + } + + /** + * ALTER TABLE ADD COLUMNS command does not support temporary view/table, + * view, or datasource table with text, orc formats or external provider. + * For datasource table, it currently only supports parquet, json, csv. + */ + private def verifyAlterTableAddColumn( + catalog: SessionCatalog, + table: TableIdentifier): CatalogTable = { + val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) + + if (catalogTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s""" + |ALTER ADD COLUMNS does not support views. + |You must drop and re-create the views for adding the new columns. Views: $table + """.stripMargin) + } + + if (DDLUtils.isDatasourceTable(catalogTable)) { + DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match { + // For datasource table, this command can only support the following File format. + // TextFileFormat only default to one column "value" + // OrcFileFormat can not handle difference between user-specified schema and + // inferred schema yet. TODO, once this issue is resolved , we can add Orc back. + // Hive type is already considered as hive serde table, so the logic will not + // come in here. + case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat => + case s => + throw new AnalysisException( + s""" + |ALTER ADD COLUMNS does not support datasource table with type $s. + |You must drop and re-create the table for adding the new columns. Tables: $table + """.stripMargin) + } + } + catalogTable + } +} + + +/** * A command that loads data into a Hive table. * * The syntax of this command is: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 4b73b078da..13202a5785 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest { assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES") } - test("alter table: add/replace columns (not allowed)") { - assertUnsupported( - """ - |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') - |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG - |COMMENT 'test_comment2') CASCADE - """.stripMargin) + test("alter table: replace columns (not allowed)") { assertUnsupported( """ |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 235c6bf6ad..648b1798c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv") + + supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider => + test(s"alter datasource table add columns - $provider") { + withTable("t1") { + sql(s"CREATE TABLE t1 (c1 int) USING $provider") + sql("INSERT INTO t1 VALUES (1)") + sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") + checkAnswer( + spark.table("t1"), + Seq(Row(1, null)) + ) + checkAnswer( + sql("SELECT * FROM t1 WHERE c2 is null"), + Seq(Row(1, null)) + ) + + sql("INSERT INTO t1 VALUES (3, 2)") + checkAnswer( + sql("SELECT * FROM t1 WHERE c2 = 2"), + Seq(Row(3, 2)) + ) + } + } + } + + supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider => + test(s"alter datasource table add columns - partitioned - $provider") { + withTable("t1") { + sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)") + sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)") + sql("ALTER TABLE t1 ADD COLUMNS (c3 int)") + checkAnswer( + spark.table("t1"), + Seq(Row(1, null, 2)) + ) + checkAnswer( + sql("SELECT * FROM t1 WHERE c3 is null"), + Seq(Row(1, null, 2)) + ) + sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)") + checkAnswer( + sql("SELECT * FROM t1 WHERE c3 = 3"), + Seq(Row(2, 3, 1)) + ) + checkAnswer( + sql("SELECT * FROM t1 WHERE c2 = 1"), + Seq(Row(2, 3, 1)) + ) + } + } + } + + test("alter datasource table add columns - text format not supported") { + withTable("t1") { + sql("CREATE TABLE t1 (c1 int) USING text") + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") + }.getMessage + assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type")) + } + } + + test("alter table add columns -- not support temp view") { + withTempView("tmp_v") { + sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2") + val e = intercept[AnalysisException] { + sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)") + } + assert(e.message.contains("ALTER ADD COLUMNS does not support views")) + } + } + + test("alter table add columns -- not support view") { + withView("v1") { + sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2") + val e = intercept[AnalysisException] { + sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)") + } + assert(e.message.contains("ALTER ADD COLUMNS does not support views")) + } + } + + test("alter table add columns with existing column name") { + withTable("t1") { + sql("CREATE TABLE t1 (c1 int) USING PARQUET") + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (c1 string)") + }.getMessage + assert(e.contains("Found duplicate column(s)")) + } + } + + Seq(true, false).foreach { caseSensitive => + test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { + withTable("t1") { + sql("CREATE TABLE t1 (c1 int) USING PARQUET") + if (!caseSensitive) { + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + }.getMessage + assert(e.contains("Found duplicate column(s)")) + } else { + if (isUsingHiveMetastore) { + // hive catalog will still complains that c1 is duplicate column name because hive + // identifiers are case insensitive. + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + }.getMessage + assert(e.contains("HiveException")) + } else { + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + assert(spark.table("t1").schema + .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) + } + } + } + } + } + } } |