aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g46
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala56
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala104
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala118
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala162
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala12
12 files changed, 360 insertions, 206 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 2f2e060b38..0e2cd39448 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -148,7 +148,7 @@ hiveNativeCommands
| ROLLBACK WORK?
| SHOW PARTITIONS tableIdentifier partitionSpec?
| DFS .*?
- | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | MSCK | LOAD) .*?
+ | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*?
;
unsupportedHiveNativeCommands
@@ -177,6 +177,7 @@ unsupportedHiveNativeCommands
| kw1=UNLOCK kw2=DATABASE
| kw1=CREATE kw2=TEMPORARY kw3=MACRO
| kw1=DROP kw2=TEMPORARY kw3=MACRO
+ | kw1=MSCK kw2=REPAIR kw3=TABLE
;
createTableHeader
@@ -651,7 +652,7 @@ nonReserved
| AFTER | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
| INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
- | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
+ | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION
;
@@ -867,6 +868,7 @@ GRANT: 'GRANT';
LOCK: 'LOCK';
UNLOCK: 'UNLOCK';
MSCK: 'MSCK';
+REPAIR: 'REPAIR';
EXPORT: 'EXPORT';
IMPORT: 'IMPORT';
LOAD: 'LOAD';
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 0009438b31..0d9b0851fa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -281,31 +281,37 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("drop partitions") {
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
- catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+ catalog.dropPartitions(
+ "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
resetState()
val catalog2 = newBasicCatalog()
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
- catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+ catalog2.dropPartitions(
+ "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
test("drop partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+ catalog.dropPartitions(
+ "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
}
intercept[AnalysisException] {
- catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+ catalog.dropPartitions(
+ "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
}
}
test("drop partitions that do not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+ catalog.dropPartitions(
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
}
- catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+ catalog.dropPartitions(
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
}
test("get partition") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 862fc275ad..426273e1e3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -496,19 +496,25 @@ class SessionCatalogSuite extends SparkFunSuite {
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
sessionCatalog.dropPartitions(
- TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false)
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part1.spec),
+ ignoreIfNotExists = false)
assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
// Drop partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
sessionCatalog.dropPartitions(
- TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false)
+ TableIdentifier("tbl2"),
+ Seq(part2.spec),
+ ignoreIfNotExists = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
// Drop multiple partitions at once
sessionCatalog.createPartitions(
TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
sessionCatalog.dropPartitions(
- TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part1.spec, part2.spec),
+ ignoreIfNotExists = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
}
@@ -516,11 +522,15 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.dropPartitions(
- TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false)
+ TableIdentifier("tbl1", Some("does_not_exist")),
+ Seq(),
+ ignoreIfNotExists = false)
}
intercept[AnalysisException] {
catalog.dropPartitions(
- TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false)
+ TableIdentifier("does_not_exist", Some("db2")),
+ Seq(),
+ ignoreIfNotExists = false)
}
}
@@ -528,10 +538,14 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.dropPartitions(
- TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false)
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part3.spec),
+ ignoreIfNotExists = false)
}
catalog.dropPartitions(
- TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true)
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part3.spec),
+ ignoreIfNotExists = true)
}
test("get partition") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 3da715cdb3..73d9640c35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -493,7 +493,9 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitAddTablePartition(
ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx)
+ if (ctx.VIEW != null) {
+ throw new AnalysisException(s"Operation not allowed: partitioned views")
+ }
// Create partition spec to location mapping.
val specsAndLocs = if (ctx.partitionSpec.isEmpty) {
ctx.partitionSpecLocation.asScala.map {
@@ -509,8 +511,7 @@ class SparkSqlAstBuilder extends AstBuilder {
AlterTableAddPartition(
visitTableIdentifier(ctx.tableIdentifier),
specsAndLocs,
- ctx.EXISTS != null)(
- command(ctx))
+ ctx.EXISTS != null)
}
/**
@@ -523,11 +524,8 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitExchangeTablePartition(
ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) {
- AlterTableExchangePartition(
- visitTableIdentifier(ctx.from),
- visitTableIdentifier(ctx.to),
- visitNonOptionalPartitionSpec(ctx.partitionSpec))(
- command(ctx))
+ throw new AnalysisException(
+ "Operation not allowed: ALTER TABLE ... EXCHANGE PARTITION ...")
}
/**
@@ -543,8 +541,7 @@ class SparkSqlAstBuilder extends AstBuilder {
AlterTableRenamePartition(
visitTableIdentifier(ctx.tableIdentifier),
visitNonOptionalPartitionSpec(ctx.from),
- visitNonOptionalPartitionSpec(ctx.to))(
- command(ctx))
+ visitNonOptionalPartitionSpec(ctx.to))
}
/**
@@ -561,13 +558,16 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitDropTablePartitions(
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx)
+ if (ctx.VIEW != null) {
+ throw new AnalysisException(s"Operation not allowed: partitioned views")
+ }
+ if (ctx.PURGE != null) {
+ throw new AnalysisException(s"Operation not allowed: PURGE")
+ }
AlterTableDropPartition(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
- ctx.EXISTS != null,
- ctx.PURGE != null)(
- command(ctx))
+ ctx.EXISTS != null)
}
/**
@@ -580,10 +580,8 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitArchiveTablePartition(
ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
- AlterTableArchivePartition(
- visitTableIdentifier(ctx.tableIdentifier),
- visitNonOptionalPartitionSpec(ctx.partitionSpec))(
- command(ctx))
+ throw new AnalysisException(
+ "Operation not allowed: ALTER TABLE ... ARCHIVE PARTITION ...")
}
/**
@@ -596,10 +594,8 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitUnarchiveTablePartition(
ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
- AlterTableUnarchivePartition(
- visitTableIdentifier(ctx.tableIdentifier),
- visitNonOptionalPartitionSpec(ctx.partitionSpec))(
- command(ctx))
+ throw new AnalysisException(
+ "Operation not allowed: ALTER TABLE ... UNARCHIVE PARTITION ...")
}
/**
@@ -658,10 +654,7 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) {
- AlterTableTouch(
- visitTableIdentifier(ctx.tableIdentifier),
- Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
- command(ctx))
+ throw new AnalysisException("Operation not allowed: ALTER TABLE ... TOUCH ...")
}
/**
@@ -673,11 +666,7 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
- AlterTableCompact(
- visitTableIdentifier(ctx.tableIdentifier),
- Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
- string(ctx.STRING))(
- command(ctx))
+ throw new AnalysisException("Operation not allowed: ALTER TABLE ... COMPACT ...")
}
/**
@@ -689,10 +678,7 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) {
- AlterTableMerge(
- visitTableIdentifier(ctx.tableIdentifier),
- Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
- command(ctx))
+ throw new AnalysisException("Operation not allowed: ALTER TABLE ... CONCATENATE")
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8a37cf8f4c..c55b1a690e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types._
@@ -348,53 +349,94 @@ case class AlterTableSerDeProperties(
}
/**
- * Add Partition in ALTER TABLE/VIEW: add the table/view partitions.
+ * Add Partition in ALTER TABLE: add the table partitions.
+ *
* 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
* EXCEPT that it is ILLEGAL to specify a LOCATION clause.
* An error message will be issued if the partition exists, unless 'ifNotExists' is true.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ * }}}
*/
case class AlterTableAddPartition(
tableName: TableIdentifier,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
- ifNotExists: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ ifNotExists: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table add partition is not allowed for tables defined using the datasource API")
+ }
+ val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+ // inherit table storage format (possibly except for location)
+ CatalogTablePartition(spec, table.storage.copy(locationUri = location))
+ }
+ catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * Alter a table partition's spec.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
+ * }}}
+ */
case class AlterTableRenamePartition(
tableName: TableIdentifier,
oldPartition: TablePartitionSpec,
- newPartition: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ newPartition: TablePartitionSpec)
+ extends RunnableCommand {
-case class AlterTableExchangePartition(
- fromTableName: TableIdentifier,
- toTableName: TableIdentifier,
- spec: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.sessionState.catalog.renamePartitions(
+ tableName, Seq(oldPartition), Seq(newPartition))
+ Seq.empty[Row]
+ }
+
+}
/**
- * Drop Partition in ALTER TABLE/VIEW: to drop a particular partition for a table/view.
+ * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
+ *
* This removes the data and metadata for this partition.
* The data is actually moved to the .Trash/Current directory if Trash is configured,
* unless 'purge' is true, but the metadata is completely lost.
* An error message will be issued if the partition does not exist, unless 'ifExists' is true.
* Note: purge is always false when the target is a view.
+ *
+ * The syntax of this command is:
+ * {{{
+ * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
+ * }}}
*/
case class AlterTableDropPartition(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
- ifExists: Boolean,
- purge: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ ifExists: Boolean)
+ extends RunnableCommand {
-case class AlterTableArchivePartition(
- tableName: TableIdentifier,
- spec: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ if (DDLUtils.isDatasourceTable(table)) {
+ throw new AnalysisException(
+ "alter table drop partition is not allowed for tables defined using the datasource API")
+ }
+ catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists)
+ Seq.empty[Row]
+ }
-case class AlterTableUnarchivePartition(
- tableName: TableIdentifier,
- spec: TablePartitionSpec)(sql: String)
- extends NativeDDLCommand(sql) with Logging
+}
case class AlterTableSetFileFormat(
tableName: TableIdentifier,
@@ -453,22 +495,6 @@ case class AlterTableSetLocation(
}
-case class AlterTableTouch(
- tableName: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec])(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableCompact(
- tableName: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec],
- compactType: String)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableMerge(
- tableName: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec])(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
case class AlterTableChangeCol(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index ac69518ddf..1c8dd68286 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.command
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
@@ -24,9 +25,17 @@ import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.types._
+// TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest {
private val parser = SparkSqlParser
+ private def assertUnsupported(sql: String): Unit = {
+ val e = intercept[AnalysisException] {
+ parser.parsePlan(sql)
+ }
+ assert(e.getMessage.toLowerCase.contains("operation not allowed"))
+ }
+
test("create database") {
val sql =
"""
@@ -326,11 +335,11 @@ class DDLCommandSuite extends PlanTest {
Seq(
(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
- ifNotExists = true)(sql1)
+ ifNotExists = true)
val expected2 = AlterTableAddPartition(
TableIdentifier("table_name", None),
Seq((Map("dt" -> "2008-08-08"), Some("loc"))),
- ifNotExists = false)(sql2)
+ ifNotExists = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
@@ -369,22 +378,16 @@ class DDLCommandSuite extends PlanTest {
val expected = AlterTableRenamePartition(
TableIdentifier("table_name", None),
Map("dt" -> "2008-08-08", "country" -> "us"),
- Map("dt" -> "2008-09-09", "country" -> "uk"))(sql)
+ Map("dt" -> "2008-09-09", "country" -> "uk"))
comparePlans(parsed, expected)
}
- test("alter table: exchange partition") {
- val sql =
+ test("alter table: exchange partition (not supported)") {
+ assertUnsupported(
"""
|ALTER TABLE table_name_1 EXCHANGE PARTITION
|(dt='2008-08-08', country='us') WITH TABLE table_name_2
- """.stripMargin
- val parsed = parser.parsePlan(sql)
- val expected = AlterTableExchangePartition(
- TableIdentifier("table_name_1", None),
- TableIdentifier("table_name_2", None),
- Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
- comparePlans(parsed, expected)
+ """.stripMargin)
}
// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]
@@ -405,7 +408,10 @@ class DDLCommandSuite extends PlanTest {
val sql2_view = sql2_table.replace("TABLE", "VIEW").replace("PURGE", "")
val parsed1_table = parser.parsePlan(sql1_table)
- val parsed2_table = parser.parsePlan(sql2_table)
+ val e = intercept[ParseException] {
+ parser.parsePlan(sql2_table)
+ }
+ assert(e.getMessage.contains("Operation not allowed"))
intercept[ParseException] {
parser.parsePlan(sql1_view)
@@ -420,36 +426,17 @@ class DDLCommandSuite extends PlanTest {
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
- ifExists = true,
- purge = false)(sql1_table)
- val expected2_table = AlterTableDropPartition(
- tableIdent,
- Seq(
- Map("dt" -> "2008-08-08", "country" -> "us"),
- Map("dt" -> "2009-09-09", "country" -> "uk")),
- ifExists = false,
- purge = true)(sql2_table)
+ ifExists = true)
comparePlans(parsed1_table, expected1_table)
- comparePlans(parsed2_table, expected2_table)
}
- test("alter table: archive partition") {
- val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')"
- val parsed = parser.parsePlan(sql)
- val expected = AlterTableArchivePartition(
- TableIdentifier("table_name", None),
- Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
- comparePlans(parsed, expected)
+ test("alter table: archive partition (not supported)") {
+ assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')")
}
- test("alter table: unarchive partition") {
- val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')"
- val parsed = parser.parsePlan(sql)
- val expected = AlterTableUnarchivePartition(
- TableIdentifier("table_name", None),
- Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
- comparePlans(parsed, expected)
+ test("alter table: unarchive partition (not supported)") {
+ assertUnsupported("ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')")
}
test("alter table: set file format") {
@@ -505,55 +492,24 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}
- test("alter table: touch") {
- val sql1 = "ALTER TABLE table_name TOUCH"
- val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')"
- val parsed1 = parser.parsePlan(sql1)
- val parsed2 = parser.parsePlan(sql2)
- val tableIdent = TableIdentifier("table_name", None)
- val expected1 = AlterTableTouch(
- tableIdent,
- None)(sql1)
- val expected2 = AlterTableTouch(
- tableIdent,
- Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
+ test("alter table: touch (not supported)") {
+ assertUnsupported("ALTER TABLE table_name TOUCH")
+ assertUnsupported("ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')")
}
- test("alter table: compact") {
- val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'"
- val sql2 =
+ test("alter table: compact (not supported)") {
+ assertUnsupported("ALTER TABLE table_name COMPACT 'compaction_type'")
+ assertUnsupported(
"""
- |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
- |COMPACT 'MAJOR'
- """.stripMargin
- val parsed1 = parser.parsePlan(sql1)
- val parsed2 = parser.parsePlan(sql2)
- val tableIdent = TableIdentifier("table_name", None)
- val expected1 = AlterTableCompact(
- tableIdent,
- None,
- "compaction_type")(sql1)
- val expected2 = AlterTableCompact(
- tableIdent,
- Some(Map("dt" -> "2008-08-08", "country" -> "us")),
- "MAJOR")(sql2)
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
+ |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+ |COMPACT 'MAJOR'
+ """.stripMargin)
}
- test("alter table: concatenate") {
- val sql1 = "ALTER TABLE table_name CONCATENATE"
- val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE"
- val parsed1 = parser.parsePlan(sql1)
- val parsed2 = parser.parsePlan(sql2)
- val tableIdent = TableIdentifier("table_name", None)
- val expected1 = AlterTableMerge(tableIdent, None)(sql1)
- val expected2 = AlterTableMerge(
- tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
+ test("alter table: concatenate (not supported)") {
+ assertUnsupported("ALTER TABLE table_name CONCATENATE")
+ assertUnsupported(
+ "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE")
}
test("alter table: change column name/type/position/comment") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c6479bf33e..40a8b0e614 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -58,6 +58,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e.getMessage.toLowerCase.contains("operation not allowed"))
}
+ private def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = {
+ if (expectException) intercept[AnalysisException] { body } else body
+ }
+
private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
}
@@ -320,6 +324,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
}
+ test("alter table: add partition") {
+ testAddPartitions(isDatasourceTable = false)
+ }
+
+ test("alter table: add partition (datasource table)") {
+ testAddPartitions(isDatasourceTable = true)
+ }
+
+ test("alter table: add partition is not supported for views") {
+ assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
+ }
+
+ test("alter table: drop partition") {
+ testDropPartitions(isDatasourceTable = false)
+ }
+
+ test("alter table: drop partition (datasource table)") {
+ testDropPartitions(isDatasourceTable = true)
+ }
+
+ test("alter table: drop partition is not supported for views") {
+ assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
+ }
+
+ test("alter table: rename partition") {
+ val catalog = sqlContext.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val part1 = Map("a" -> "1")
+ val part2 = Map("b" -> "2")
+ val part3 = Map("c" -> "3")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ createTablePartition(catalog, part1, tableIdent)
+ createTablePartition(catalog, part2, tableIdent)
+ createTablePartition(catalog, part3, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3))
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "100"), Map("b" -> "200"), part3))
+ // rename without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "10"), Map("b" -> "200"), part3))
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
+ }
+ // partition to rename does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')")
+ }
+ }
+
// TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
test("show tables") {
@@ -487,10 +547,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(storageFormat.locationUri === Some(expected))
}
}
- // Optionally expect AnalysisException
- def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = {
- if (expectException) intercept[AnalysisException] { body } else body
- }
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
verifyLocation("/path/to/your/lovely/heart")
@@ -564,4 +620,102 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
+ private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+ val catalog = sqlContext.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val part1 = Map("a" -> "1")
+ val part2 = Map("b" -> "2")
+ val part3 = Map("c" -> "3")
+ val part4 = Map("d" -> "4")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ createTablePartition(catalog, part1, tableIdent)
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+ "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')")
+ }
+ if (!isDatasourceTable) {
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+ assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris"))
+ assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
+ }
+ // add partitions without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+ }
+ if (!isDatasourceTable) {
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
+ }
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')")
+ }
+ // partition to add already exists
+ intercept[AnalysisException] {
+ sql("ALTER TABLE tab1 ADD PARTITION (d='4')")
+ }
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+ }
+ if (!isDatasourceTable) {
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
+ }
+ }
+
+ private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
+ val catalog = sqlContext.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val part1 = Map("a" -> "1")
+ val part2 = Map("b" -> "2")
+ val part3 = Map("c" -> "3")
+ val part4 = Map("d" -> "4")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ createTablePartition(catalog, part1, tableIdent)
+ createTablePartition(catalog, part2, tableIdent)
+ createTablePartition(catalog, part3, tableIdent)
+ createTablePartition(catalog, part4, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')")
+ }
+ if (!isDatasourceTable) {
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
+ }
+ // drop partitions without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')")
+ }
+ if (!isDatasourceTable) {
+ assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+ }
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')")
+ }
+ // partition to drop does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE tab1 DROP PARTITION (x='300')")
+ }
+ maybeWrapException(isDatasourceTable) {
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')")
+ }
+ if (!isDatasourceTable) {
+ assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+ }
+ }
+
}
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 9e3cb18d45..f0eeda09db 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -376,7 +376,13 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Create partitioned view is not supported
"create_like_view",
- "describe_formatted_view_partitioned"
+ "describe_formatted_view_partitioned",
+
+ // This uses CONCATENATE, which we don't support
+ "alter_merge_2",
+
+ // TOUCH is not supported
+ "touch"
)
/**
@@ -392,7 +398,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"alter2",
"alter3",
"alter5",
- "alter_merge_2",
"alter_partition_format_loc",
"alter_partition_with_whitelist",
"alter_rename_partition",
@@ -897,7 +902,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"timestamp_comparison",
"timestamp_lazy",
"timestamp_null",
- "touch",
"transform_ppr1",
"transform_ppr2",
"truncate_table",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index a49ce33ba1..482f47428d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -219,26 +219,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = withClient {
requireTableExists(db, table)
- // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
- // need to implement it here ourselves. This is currently somewhat expensive because
- // we make multiple synchronous calls to Hive for each partition we want to drop.
- val partsToDrop =
- if (ignoreIfNotExists) {
- parts.filter { spec =>
- try {
- getPartition(db, table, spec)
- true
- } catch {
- // Filter out the partitions that do not actually exist
- case _: AnalysisException => false
- }
- }
- } else {
- parts
- }
- if (partsToDrop.nonEmpty) {
- client.dropPartitions(db, table, partsToDrop)
- }
+ client.dropPartitions(db, table, parts, ignoreIfNotExists)
}
override def renamePartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 94794b1572..6f7e7bf451 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -120,16 +120,13 @@ private[hive] trait HiveClient {
ignoreIfExists: Boolean): Unit
/**
- * Drop one or many partitions in the given table.
- *
- * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the
- * partitions do not already exist. The seemingly relevant flag `ifExists` in
- * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere.
+ * Drop one or many partitions in the given table, assuming they exist.
*/
def dropPartitions(
db: String,
table: String,
- specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit
+ specs: Seq[ExternalCatalog.TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit
/**
* Rename one or many existing table partitions, assuming they exist.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a037671ef0..39e26acd7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
@@ -367,9 +367,25 @@ private[hive] class HiveClientImpl(
override def dropPartitions(
db: String,
table: String,
- specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState {
+ specs: Seq[ExternalCatalog.TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
- specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) }
+ val hiveTable = client.getTable(db, table, true /* throw exception */)
+ specs.foreach { s =>
+ // The provided spec here can be a partial spec, i.e. it will match all partitions
+ // whose specs are supersets of this partial spec. E.g. If a table has partitions
+ // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
+ val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala
+ if (matchingParts.isEmpty && !ignoreIfNotExists) {
+ throw new AnalysisException(
+ s"partition to drop '$s' does not exist in table '$table' database '$db'")
+ }
+ matchingParts.foreach { hivePartition =>
+ val dropOptions = new PartitionDropOptions
+ dropOptions.ifExists = ignoreIfNotExists
+ client.dropPartition(db, table, hivePartition.getValues, dropOptions)
+ }
+ }
}
override def renamePartitions(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index a144da4997..e8086aec32 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -41,6 +41,13 @@ class HiveDDLCommandSuite extends PlanTest {
}.head
}
+ private def assertUnsupported(sql: String): Unit = {
+ val e = intercept[ParseException] {
+ parser.parsePlan(sql)
+ }
+ assert(e.getMessage.toLowerCase.contains("unsupported"))
+ }
+
test("Test CTAS #1") {
val s1 =
"""CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
@@ -367,4 +374,9 @@ class HiveDDLCommandSuite extends PlanTest {
parser.parsePlan(v1).isInstanceOf[HiveNativeCommand]
}
}
+
+ test("MSCK repair table (not supported)") {
+ assertUnsupported("MSCK REPAIR TABLE tab1")
+ }
+
}