aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorXin Wu <xinwu@us.ibm.com>2017-03-21 08:49:54 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-21 08:49:54 -0700
commit4c0ff5f58565f811b65f1a11b6121da007bcbd5f (patch)
treed7c213a0d24f23ffd7203c0ce22c619e5ac3bc5c /sql/core/src
parent63f077fbe50b4094340e9915db41d7dbdba52975 (diff)
downloadspark-4c0ff5f58565f811b65f1a11b6121da007bcbd5f.tar.gz
spark-4c0ff5f58565f811b65f1a11b6121da007bcbd5f.tar.bz2
spark-4c0ff5f58565f811b65f1a11b6121da007bcbd5f.zip
[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables
## What changes were proposed in this pull request? Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables. In this PR, we consider a few aspects: 1. View is not supported for `ALTER ADD COLUMNS` 2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use. 3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs. 4. For datasource table, this feature does not support the following: 4.1 TEXT format, since there is only one default column `value` is inferred for text format data. 4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files. 4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema. 4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported. 5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration. 6. This feature also supports In-Memory catalog, while Hive support is turned off. ## How was this patch tested? Add new test cases Author: Xin Wu <xinwu@us.ibm.com> Closes #16626 from xwu0226/alter_add_columns.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala122
4 files changed, 214 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index abea7a3bcf..d4f23f9dd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -742,6 +742,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[AlterTableAddColumnsCommand]] command.
+ *
+ * For example:
+ * {{{
+ * ALTER TABLE table1
+ * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+ */
+ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
+ AlterTableAddColumnsCommand(
+ visitTableIdentifier(ctx.tableIdentifier),
+ visitColTypeList(ctx.columns)
+ )
+ }
+
+ /**
* Create an [[AlterTableSetPropertiesCommand]] command.
*
* For example:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index beb3dcafd6..93307fc883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -175,6 +178,77 @@ case class AlterTableRenameCommand(
}
/**
+ * A command that add columns to a table
+ * The syntax of using this command in SQL is:
+ * {{{
+ * ALTER TABLE table_identifier
+ * ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
+ * }}}
+*/
+case class AlterTableAddColumnsCommand(
+ table: TableIdentifier,
+ columns: Seq[StructField]) extends RunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val catalogTable = verifyAlterTableAddColumn(catalog, table)
+
+ try {
+ sparkSession.catalog.uncacheTable(table.quotedString)
+ } catch {
+ case NonFatal(e) =>
+ log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
+ }
+ catalog.refreshTable(table)
+
+ // make sure any partition columns are at the end of the fields
+ val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+ catalog.alterTableSchema(
+ table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
+
+ Seq.empty[Row]
+ }
+
+ /**
+ * ALTER TABLE ADD COLUMNS command does not support temporary view/table,
+ * view, or datasource table with text, orc formats or external provider.
+ * For datasource table, it currently only supports parquet, json, csv.
+ */
+ private def verifyAlterTableAddColumn(
+ catalog: SessionCatalog,
+ table: TableIdentifier): CatalogTable = {
+ val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+
+ if (catalogTable.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ s"""
+ |ALTER ADD COLUMNS does not support views.
+ |You must drop and re-create the views for adding the new columns. Views: $table
+ """.stripMargin)
+ }
+
+ if (DDLUtils.isDatasourceTable(catalogTable)) {
+ DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
+ // For datasource table, this command can only support the following File format.
+ // TextFileFormat only default to one column "value"
+ // OrcFileFormat can not handle difference between user-specified schema and
+ // inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
+ // Hive type is already considered as hive serde table, so the logic will not
+ // come in here.
+ case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
+ case s =>
+ throw new AnalysisException(
+ s"""
+ |ALTER ADD COLUMNS does not support datasource table with type $s.
+ |You must drop and re-create the table for adding the new columns. Tables: $table
+ """.stripMargin)
+ }
+ }
+ catalogTable
+ }
+}
+
+
+/**
* A command that loads data into a Hive table.
*
* The syntax of this command is:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 4b73b078da..13202a5785 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
}
- test("alter table: add/replace columns (not allowed)") {
- assertUnsupported(
- """
- |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
- |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
- |COMMENT 'test_comment2') CASCADE
- """.stripMargin)
+ test("alter table: replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 235c6bf6ad..648b1798c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
+
+ supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+ test(s"alter datasource table add columns - $provider") {
+ withTable("t1") {
+ sql(s"CREATE TABLE t1 (c1 int) USING $provider")
+ sql("INSERT INTO t1 VALUES (1)")
+ sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(Row(1, null))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 is null"),
+ Seq(Row(1, null))
+ )
+
+ sql("INSERT INTO t1 VALUES (3, 2)")
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 = 2"),
+ Seq(Row(3, 2))
+ )
+ }
+ }
+ }
+
+ supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
+ test(s"alter datasource table add columns - partitioned - $provider") {
+ withTable("t1") {
+ sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
+ sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
+ sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(Row(1, null, 2))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c3 is null"),
+ Seq(Row(1, null, 2))
+ )
+ sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c3 = 3"),
+ Seq(Row(2, 3, 1))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t1 WHERE c2 = 1"),
+ Seq(Row(2, 3, 1))
+ )
+ }
+ }
+ }
+
+ test("alter datasource table add columns - text format not supported") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING text")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
+ }.getMessage
+ assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
+ }
+ }
+
+ test("alter table add columns -- not support temp view") {
+ withTempView("tmp_v") {
+ sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
+ }
+ assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+ }
+ }
+
+ test("alter table add columns -- not support view") {
+ withView("v1") {
+ sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
+ }
+ assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
+ }
+ }
+
+ test("alter table add columns with existing column name") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
+ }.getMessage
+ assert(e.contains("Found duplicate column(s)"))
+ }
+ }
+
+ Seq(true, false).foreach { caseSensitive =>
+ test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (c1 int) USING PARQUET")
+ if (!caseSensitive) {
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e.contains("Found duplicate column(s)"))
+ } else {
+ if (isUsingHiveMetastore) {
+ // hive catalog will still complains that c1 is duplicate column name because hive
+ // identifiers are case insensitive.
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ }.getMessage
+ assert(e.contains("HiveException"))
+ } else {
+ sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+ assert(spark.table("t1").schema
+ .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
+ }
+ }
+ }
+ }
+ }
+ }
}