aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala122
4 files changed, 214 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index abea7a3bcf..d4f23f9dd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -742,6 +742,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[AlterTableAddColumnsCommand]] command.
+ *
+ * For example:
+ * {{{
+ * ALTER TABLE table1
+ * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+ */
+ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
+ AlterTableAddColumnsCommand(
+ visitTableIdentifier(ctx.tableIdentifier),
+ visitColTypeList(ctx.columns)
+ )
+ }
+
+ /**
* Create an [[AlterTableSetPropertiesCommand]] command.
*
* For example:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index beb3dcafd6..93307fc883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -175,6 +178,77 @@ case class AlterTableRenameCommand(
}
/**
+ * A command that add columns to a table
+ * The syntax of using this command in SQL is:
+ * {{{
+ * ALTER TABLE table_identifier
+ * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+*/
+case class AlterTableAddColumnsCommand(
+ table: TableIdentifier,
+ columns: Seq[StructField]) extends RunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val catalogTable = verifyAlterTableAddColumn(catalog, table)
+
+ try {
+ sparkSession.catalog.uncacheTable(table.quotedString)
+ } catch {
+ case NonFatal(e) =>
+ log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
+ }
+ catalog.refreshTable(table)
+
+ // make sure any partition columns are at the end of the fields
+ val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+ catalog.alterTableSchema(
+ table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
+
+ Seq.empty[Row]
+ }
+
+ /**
+ * ALTER TABLE ADD COLUMNS command does not support temporary view/table,
+ * view, or datasource table with text, orc formats or external provider.
+ * For datasource table, it currently only supports parquet, json, csv.
+ */
+ private def verifyAlterTableAddColumn(
+ catalog: SessionCatalog,
+ table: TableIdentifier): CatalogTable = {
+ val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+
+ if (catalogTable.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ s"""
+ |ALTER ADD COLUMNS does not support views.
+ |You must drop and re-create the views for adding the new columns. Views: $table
+ """.stripMargin)
+ }
+
+ if (DDLUtils.isDatasourceTable(catalogTable)) {
+ DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
+ // For datasource table, this command can only support the following File format.
+ // TextFileFormat only default to one column "value"
+ // OrcFileFormat can not handle difference between user-specified schema and
+ // inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
+ // Hive type is already considered as hive serde table, so the logic will not
+ // come in here.
+ case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
+ case s =>
+ throw new AnalysisException(
+ s"""
+ |ALTER ADD COLUMNS does not support datasource table with type $s.
+ |You must drop and re-create the table for adding the new columns. Tables: $table
+ """.stripMargin)
+ }
+ }
+ catalogTable
+ }
+}
+
+
+/**
* A command that loads data into a Hive table.
*
* The syntax of this command is:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 4b73b078da..13202a5785 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
}
- test("alter table: add/replace columns (not allowed)") {
- assertUnsupported(
- """
- |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
- |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
- |COMMENT 'test_comment2') CASCADE
- """.stripMargin)
+ test("alter table: replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 235c6bf6ad..648b1798c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
+
+ supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+ test(s"alter datasource table add columns - $provider") {
+ withTable("t1") {
+ sql(s"CREATE TABLE t1 (c1 int) USING $provider")
+ sql("INSERT INTO t1 VALUES (1)")
+ sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(Row(1, null))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 is null"),
+ Seq(Row(1, null))
+ )
+
+ sql("INSERT INTO t1 VALUES (3, 2)")
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 = 2"),
+ Seq(Row(3, 2))
+ )
+ }
+ }
+ }
+
+ supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+ test(s"alter datasource table add columns - partitioned - $provider") {
+ withTable("t1") {
+ sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
+ sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
+ sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(Row(1, null, 2))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c3 is null"),
+ Seq(Row(1, null, 2))
+ )
+ sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c3 = 3"),
+ Seq(Row(2, 3, 1))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 = 1"),
+ Seq(Row(2, 3, 1))
+ )
+ }
+ }
+ }
+
+ test("alter datasource table add columns - text format not supported") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING text")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+ }.getMessage
+ assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
+ }
+ }
+
+ test("alter table add columns -- not support temp view") {
+ withTempView("tmp_v") {
+ sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
+ }
+ assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+ }
+ }
+
+ test("alter table add columns -- not support view") {
+ withView("v1") {
+ sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
+ }
+ assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+ }
+ }
+
+ test("alter table add columns with existing column name") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
+ }.getMessage
+ assert(e.contains("Found duplicate column(s)"))
+ }
+ }
+
+ Seq(true, false).foreach { caseSensitive =>
+ test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+ if (!caseSensitive) {
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e.contains("Found duplicate column(s)"))
+ } else {
+ if (isUsingHiveMetastore) {
+ // hive catalog will still complains that c1 is duplicate column name because hive
+ // identifiers are case insensitive.
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e.contains("HiveException"))
+ } else {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ assert(spark.table("t1").schema
+ .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
+ }
+ }
+ }
+ }
+ }
+ }
}