aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala64
2 files changed, 84 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 95bac94996..5fd0b83cf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -293,7 +293,7 @@ case class AlterTableSerDePropertiesCommand(
tableName: TableIdentifier,
serdeClassName: Option[String],
serdeProperties: Option[Map[String, String]],
- partition: Option[Map[String, String]])
+ partSpec: Option[TablePartitionSpec])
extends RunnableCommand {
// should never happen if we parsed things correctly
@@ -306,15 +306,29 @@ case class AlterTableSerDePropertiesCommand(
"ALTER TABLE SERDEPROPERTIES")
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
- // Do not support setting serde for datasource tables
+ // For datasource tables, disallow setting serde or specifying partition
+ if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
+ "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " +
+ "for tables created with the datasource API")
+ }
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
"not supported for tables created with the datasource API")
}
- val newTable = table.withNewStorage(
- serde = serdeClassName.orElse(table.storage.serde),
- serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
- catalog.alterTable(newTable)
+ if (partSpec.isEmpty) {
+ val newTable = table.withNewStorage(
+ serde = serdeClassName.orElse(table.storage.serde),
+ serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
+ catalog.alterTable(newTable)
+ } else {
+ val spec = partSpec.get
+ val part = catalog.getPartition(tableName, spec)
+ val newPart = part.copy(storage = part.storage.copy(
+ serde = serdeClassName.orElse(part.storage.serde),
+ serdeProperties = part.storage.serdeProperties ++ serdeProperties.getOrElse(Map())))
+ catalog.alterPartitions(tableName, Seq(newPart))
+ }
Seq.empty[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ccb4006483..5d45cfb501 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -538,6 +538,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testSetSerde(isDatasourceTable = true)
}
+ test("alter table: set serde partition") {
+ testSetSerdePartition(isDatasourceTable = false)
+ }
+
+ test("alter table: set serde partition (datasource table)") {
+ testSetSerdePartition(isDatasourceTable = true)
+ }
+
test("alter table: bucketing is not supported") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -931,6 +939,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}
+ private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val spec = Map("a" -> "1", "b" -> "2")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ createTablePartition(catalog, spec, tableIdent)
+ createTablePartition(catalog, Map("a" -> "1", "b" -> "3"), tableIdent)
+ createTablePartition(catalog, Map("a" -> "2", "b" -> "2"), tableIdent)
+ createTablePartition(catalog, Map("a" -> "2", "b" -> "3"), tableIdent)
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+ assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
+ assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty)
+ // set table serde and/or properties (should fail on datasource tables)
+ if (isDatasourceTable) {
+ val e1 = intercept[AnalysisException] {
+ sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'whatever'")
+ }
+ val e2 = intercept[AnalysisException] {
+ sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
+ "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+ }
+ assert(e1.getMessage.contains("datasource"))
+ assert(e2.getMessage.contains("datasource"))
+ } else {
+ sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
+ assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
+ assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty)
+ sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
+ "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+ assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
+ assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
+ Map("k" -> "v", "kay" -> "vee"))
+ }
+ // set serde properties only
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
+ "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
+ assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
+ Map("k" -> "vvv", "kay" -> "vee"))
+ }
+ // set things without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
+ assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
+ Map("k" -> "vvv", "kay" -> "veee"))
+ }
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
+ }
+ }
+
private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))