aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorXin Wu <xinwu@us.ibm.com>2017-03-21 08:49:54 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-21 08:49:54 -0700
commit4c0ff5f58565f811b65f1a11b6121da007bcbd5f (patch)
treed7c213a0d24f23ffd7203c0ce22c619e5ac3bc5c /sql/catalyst
parent63f077fbe50b4094340e9915db41d7dbdba52975 (diff)
downloadspark-4c0ff5f58565f811b65f1a11b6121da007bcbd5f.tar.gz
spark-4c0ff5f58565f811b65f1a11b6121da007bcbd5f.tar.bz2
spark-4c0ff5f58565f811b65f1a11b6121da007bcbd5f.zip
[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables
## What changes were proposed in this pull request? Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables. In this PR, we consider a few aspects: 1. View is not supported for `ALTER ADD COLUMNS` 2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use. 3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs. 4. For datasource table, this feature does not support the following: 4.1 TEXT format, since there is only one default column `value` is inferred for text format data. 4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files. 4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema. 4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported. 5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration. 6. This feature also supports In-Memory catalog, while Hive support is turned off. ## How was this patch tested? Add new test cases Author: Xin Wu <xinwu@us.ibm.com> Closes #16626 from xwu0226/alter_add_columns.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g43
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala56
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala29
3 files changed, 87 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index cc3b8fd3b4..c4a590ec69 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -85,6 +85,8 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq)? #analyze
+ | ALTER TABLE tableIdentifier
+ ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
@@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
- | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
| kw1=COMMIT
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b134fd44a3..a469d12451 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.{StructField, StructType}
object SessionCatalog {
val DEFAULT_DATABASE = "default"
@@ -161,6 +162,20 @@ class SessionCatalog(
throw new TableAlreadyExistsException(db = db, table = name.table)
}
}
+
+ private def checkDuplication(fields: Seq[StructField]): Unit = {
+ val columnNames = if (conf.caseSensitiveAnalysis) {
+ fields.map(_.name)
+ } else {
+ fields.map(_.name.toLowerCase)
+ }
+ if (columnNames.distinct.length != columnNames.length) {
+ val duplicateColumns = columnNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => x
+ }
+ throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
+ }
+ }
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -296,6 +311,47 @@ class SessionCatalog(
}
/**
+ * Alter the schema of a table identified by the provided table identifier. The new schema
+ * should still contain the existing bucket columns and partition columns used by the table. This
+ * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+ * as the schema itself).
+ *
+ * @param identifier TableIdentifier
+ * @param newSchema Updated schema to be used for the table (must contain existing partition and
+ * bucket columns, and partition columns need to be at the end)
+ */
+ def alterTableSchema(
+ identifier: TableIdentifier,
+ newSchema: StructType): Unit = {
+ val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
+ val table = formatTableName(identifier.table)
+ val tableIdentifier = TableIdentifier(table, Some(db))
+ requireDbExists(db)
+ requireTableExists(tableIdentifier)
+ checkDuplication(newSchema)
+
+ val catalogTable = externalCatalog.getTable(db, table)
+ val oldSchema = catalogTable.schema
+
+ // not supporting dropping columns yet
+ val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+ if (nonExistentColumnNames.nonEmpty) {
+ throw new AnalysisException(
+ s"""
+ |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
+ |not present in the new schema. We don't support dropping columns yet.
+ """.stripMargin)
+ }
+
+ // assuming the newSchema has all partition columns at the end as required
+ externalCatalog.alterTableSchema(db, table, newSchema)
+ }
+
+ private def columnNameResolved(schema: StructType, colName: String): Boolean = {
+ schema.fields.map(_.name).exists(conf.resolver(_, colName))
+ }
+
+ /**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fd9e5d6bb1..ca4ce1c117 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
+import org.apache.spark.sql.types._
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
@@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
+ test("alter table add columns") {
+ withBasicCatalog { sessionCatalog =>
+ sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+ val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ sessionCatalog.alterTableSchema(
+ TableIdentifier("t1", Some("default")),
+ StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
+
+ val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ // construct the expected table schema
+ val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
+ Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
+ assert(newTab.schema == expectedTableSchema)
+ }
+ }
+
+ test("alter table drop columns") {
+ withBasicCatalog { sessionCatalog =>
+ sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+ val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ val e = intercept[AnalysisException] {
+ sessionCatalog.alterTableSchema(
+ TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
+ }.getMessage
+ assert(e.contains("We don't support dropping columns yet."))
+ }
+ }
+
test("get table") {
withBasicCatalog { catalog =>
assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))