From 4a2fb8b87ca4517e0f4a1d7a1a1b3c08c1c1294d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 27 May 2016 17:27:24 -0700 Subject: [SPARK-15594][SQL] ALTER TABLE SERDEPROPERTIES does not respect partition spec ## What changes were proposed in this pull request? These commands ignore the partition spec and change the storage properties of the table itself: ``` ALTER TABLE table_name PARTITION (a=1, b=2) SET SERDE 'my_serde' ALTER TABLE table_name PARTITION (a=1, b=2) SET SERDEPROPERTIES ('key1'='val1') ``` Now they change the storage properties of the specified partition. ## How was this patch tested? DDLSuite Author: Andrew Or Closes #13343 from andrewor14/alter-table-serdeproperties. --- .../apache/spark/sql/execution/command/ddl.scala | 26 +++++++-- .../spark/sql/execution/command/DDLSuite.scala | 64 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 95bac94996..5fd0b83cf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -293,7 +293,7 @@ case class AlterTableSerDePropertiesCommand( tableName: TableIdentifier, serdeClassName: Option[String], serdeProperties: Option[Map[String, String]], - partition: Option[Map[String, String]]) + partSpec: Option[TablePartitionSpec]) extends RunnableCommand { // should never happen if we parsed things correctly @@ -306,15 +306,29 @@ case class AlterTableSerDePropertiesCommand( "ALTER TABLE SERDEPROPERTIES") val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) - // Do not support setting serde for datasource tables + // For datasource tables, disallow setting serde or specifying partition + if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException("Operation not allowed: ALTER TABLE SET " + + "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " + + "for tables created with the datasource API") + } if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + "not supported for tables created with the datasource API") } - val newTable = table.withNewStorage( - serde = serdeClassName.orElse(table.storage.serde), - serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map())) - catalog.alterTable(newTable) + if (partSpec.isEmpty) { + val newTable = table.withNewStorage( + serde = serdeClassName.orElse(table.storage.serde), + serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map())) + catalog.alterTable(newTable) + } else { + val spec = partSpec.get + val part = catalog.getPartition(tableName, spec) + val newPart = part.copy(storage = part.storage.copy( + serde = serdeClassName.orElse(part.storage.serde), + serdeProperties = part.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))) + catalog.alterPartitions(tableName, Seq(newPart)) + } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ccb4006483..5d45cfb501 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -538,6 +538,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { testSetSerde(isDatasourceTable = true) } + test("alter table: set serde partition") { + testSetSerdePartition(isDatasourceTable = false) + } + + test("alter table: set serde partition (datasource table)") { + testSetSerdePartition(isDatasourceTable = true) + } + test("alter table: bucketing is not supported") { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) @@ -931,6 +939,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) } + private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = { + val catalog = spark.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val spec = Map("a" -> "1", "b" -> "2") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, spec, tableIdent) + createTablePartition(catalog, Map("a" -> "1", "b" -> "3"), tableIdent) + createTablePartition(catalog, Map("a" -> "2", "b" -> "2"), tableIdent) + createTablePartition(catalog, Map("a" -> "2", "b" -> "3"), tableIdent) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty) + assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty) + // set table serde and/or properties (should fail on datasource tables) + if (isDatasourceTable) { + val e1 = intercept[AnalysisException] { + sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'whatever'") + } + val e2 = intercept[AnalysisException] { + sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " + + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") + } + assert(e1.getMessage.contains("datasource")) + assert(e2.getMessage.contains("datasource")) + } else { + sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'") + assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop")) + assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty) + sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " + + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") + assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop")) + assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + Map("k" -> "v", "kay" -> "vee")) + } + // set serde properties only + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " + + "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')") + assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + Map("k" -> "vvv", "kay" -> "vee")) + } + // set things without explicitly specifying database + catalog.setCurrentDatabase("dbx") + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')") + assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties == + Map("k" -> "vvv", "kay" -> "veee")) + } + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')") + } + } + private def testAddPartitions(isDatasourceTable: Boolean): Unit = { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) -- cgit v1.2.3