aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g43
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala56
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala29
3 files changed, 87 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index cc3b8fd3b4..c4a590ec69 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -85,6 +85,8 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq)? #analyze
+ | ALTER TABLE tableIdentifier
+ ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
@@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
- | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
| kw1=COMMIT
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b134fd44a3..a469d12451 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.{StructField, StructType}
object SessionCatalog {
val DEFAULT_DATABASE = "default"
@@ -161,6 +162,20 @@ class SessionCatalog(
throw new TableAlreadyExistsException(db = db, table = name.table)
}
}
+
+ private def checkDuplication(fields: Seq[StructField]): Unit = {
+ val columnNames = if (conf.caseSensitiveAnalysis) {
+ fields.map(_.name)
+ } else {
+ fields.map(_.name.toLowerCase)
+ }
+ if (columnNames.distinct.length != columnNames.length) {
+ val duplicateColumns = columnNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => x
+ }
+ throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
+ }
+ }
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -296,6 +311,47 @@ class SessionCatalog(
}
/**
+ * Alter the schema of a table identified by the provided table identifier. The new schema
+ * should still contain the existing bucket columns and partition columns used by the table. This
+ * method will also update any Spark SQL-related parameters stored as Hive table properties (such
+ * as the schema itself).
+ *
+ * @param identifier TableIdentifier
+ * @param newSchema Updated schema to be used for the table (must contain existing partition and
+ * bucket columns, and partition columns need to be at the end)
+ */
+ def alterTableSchema(
+ identifier: TableIdentifier,
+ newSchema: StructType): Unit = {
+ val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
+ val table = formatTableName(identifier.table)
+ val tableIdentifier = TableIdentifier(table, Some(db))
+ requireDbExists(db)
+ requireTableExists(tableIdentifier)
+ checkDuplication(newSchema)
+
+ val catalogTable = externalCatalog.getTable(db, table)
+ val oldSchema = catalogTable.schema
+
+ // not supporting dropping columns yet
+ val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+ if (nonExistentColumnNames.nonEmpty) {
+ throw new AnalysisException(
+ s"""
+ |Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
+ |not present in the new schema. We don't support dropping columns yet.
+ """.stripMargin)
+ }
+
+ // assuming the newSchema has all partition columns at the end as required
+ externalCatalog.alterTableSchema(db, table, newSchema)
+ }
+
+ private def columnNameResolved(schema: StructType, colName: String): Boolean = {
+ schema.fields.map(_.name).exists(conf.resolver(_, colName))
+ }
+
+ /**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fd9e5d6bb1..ca4ce1c117 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
+import org.apache.spark.sql.types._
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
@@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
+ test("alter table add columns") {
+ withBasicCatalog { sessionCatalog =>
+ sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+ val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ sessionCatalog.alterTableSchema(
+ TableIdentifier("t1", Some("default")),
+ StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
+
+ val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ // construct the expected table schema
+ val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
+ Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
+ assert(newTab.schema == expectedTableSchema)
+ }
+ }
+
+ test("alter table drop columns") {
+ withBasicCatalog { sessionCatalog =>
+ sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
+ val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+ val e = intercept[AnalysisException] {
+ sessionCatalog.alterTableSchema(
+ TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
+ }.getMessage
+ assert(e.contains("We don't support dropping columns yet."))
+ }
+ }
+
test("get table") {
withBasicCatalog { catalog =>
assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))