aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-11-15 15:59:04 -0800
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-15 15:59:04 -0800
commit3ce057d0010a0f6f8163046ba502a126adc68f33 (patch)
tree0efbff69f01d6b6ceff9092873b74b92b7c1554d
parent503378f10ca92064034aa88e0feebe4718af8bbe (diff)
downloadspark-3ce057d0010a0f6f8163046ba502a126adc68f33.tar.gz
spark-3ce057d0010a0f6f8163046ba502a126adc68f33.tar.bz2
spark-3ce057d0010a0f6f8163046ba502a126adc68f33.zip
[SPARK-17732][SQL] ALTER TABLE DROP PARTITION should support comparators
## What changes were proposed in this pull request? This PR aims to support `comparators`, e.g. '<', '<=', '>', '>=', again in Apache Spark 2.0 for backward compatibility. **Spark 1.6** ``` scala scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = [result: string] scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") res1: org.apache.spark.sql.DataFrame = [result: string] ``` **Spark 2.0** ``` scala scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = [] scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '<' expecting {')', ','}(line 1, pos 42) ``` After this PR, it's supported. ## How was this patch tested? Pass the Jenkins test with a newly added testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15704 from dongjoon-hyun/SPARK-17732-2.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g46
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala103
7 files changed, 185 insertions, 24 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index b599a88495..fcca11c69f 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -239,11 +239,7 @@ partitionSpecLocation
;
partitionSpec
- : PARTITION '(' partitionVal (',' partitionVal)* ')'
- ;
-
-partitionVal
- : identifier (EQ constant)?
+ : PARTITION '(' expression (',' expression)* ')'
;
describeFuncName
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3fa7bf1cdb..8e8d37407c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -194,10 +194,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitPartitionSpec(
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
- val parts = ctx.partitionVal.asScala.map { pVal =>
- val name = pVal.identifier.getText
- val value = Option(pVal.constant).map(visitStringConstant)
- name -> value
+ val parts = ctx.expression.asScala.map { pVal =>
+ expression(pVal) match {
+ case UnresolvedAttribute(name :: Nil) =>
+ name -> None
+ case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
+ name -> Option(constant.toString)
+ case _ =>
+ throw new ParseException("Invalid partition filter specification", ctx)
+ }
}
// Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
// in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
@@ -207,6 +212,23 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}
/**
+ * Create a partition filter specification.
+ */
+ def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
+ val parts = ctx.expression.asScala.map { pVal =>
+ expression(pVal) match {
+ case EqualNullSafe(_, _) =>
+ throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
+ case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
+ cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
+ case _ =>
+ throw new ParseException("Invalid partition filter specification", ctx)
+ }
+ }
+ parts.reduceLeft(And)
+ }
+
+ /**
* Create a partition specification map without optional values.
*/
protected def visitNonOptionalPartitionSpec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index b8be3d17ba..112d812cb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -813,7 +813,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
- ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
+ ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 84a63fdb9f..6c1c398940 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -418,27 +419,55 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ specs: Seq[Expression],
ifExists: Boolean,
purge: Boolean)
- extends RunnableCommand {
+ extends RunnableCommand with PredicateHelper {
+
+ private def isRangeComparison(expr: Expression): Boolean = {
+ expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
+ }
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ specs.foreach { expr =>
+ expr.references.foreach { attr =>
+ if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
+ throw new AnalysisException(s"${attr.name} is not a valid partition column " +
+ s"in table ${table.identifier.quotedString}.")
+ }
+ }
}
- catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
+ if (specs.exists(isRangeComparison)) {
+ val partitionSet = specs.flatMap { spec =>
+ val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
+ if (partitions.isEmpty && !ifExists) {
+ throw new AnalysisException(s"There is no partition for ${spec.sql}")
+ }
+ partitions
+ }.distinct
+ catalog.dropPartitions(
+ table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
+ } else {
+ val normalizedSpecs = specs.map { expr =>
+ val spec = splitConjunctivePredicates(expr).map {
+ case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
+ }.toMap
+ PartitioningUtils.normalizePartitionSpec(
+ spec,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ resolver)
+ }
+ catalog.dropPartitions(
+ table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
+ }
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 4f19a2d00b..e81512d1ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -215,8 +215,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (overwrite.enabled) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
+ import org.apache.spark.sql.catalyst.expressions._
+ val expressions = deletedPartitions.map { specs =>
+ specs.map { case (key, value) =>
+ EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
+ }.reduceLeft(And)
+ }.toSeq
AlterTableDropPartitionCommand(
- l.catalogTable.get.identifier, deletedPartitions.toSeq,
+ l.catalogTable.get.identifier, expressions,
ifExists = true, purge = true).run(t.sparkSession)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index d31e7aeb3a..057528bef5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -612,8 +613,12 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
- Map("dt" -> "2008-08-08", "country" -> "us"),
- Map("dt" -> "2009-09-09", "country" -> "uk")),
+ And(
+ EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
+ EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
+ And(
+ EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
+ EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6efae13ddf..a2b04863d3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -225,6 +226,108 @@ class HiveDDLSuite
}
}
+ test("SPARK-17732: Drop partitions by filter") {
+ withTable("sales") {
+ sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
+
+ for (country <- Seq("US", "CA", "KR")) {
+ for (quarter <- 1 to 4) {
+ sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
+ }
+ }
+
+ sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
+ checkAnswer(sql("SHOW PARTITIONS sales"),
+ Row("country=CA/quarter=1") ::
+ Row("country=CA/quarter=2") ::
+ Row("country=KR/quarter=1") ::
+ Row("country=KR/quarter=2") ::
+ Row("country=KR/quarter=3") ::
+ Row("country=KR/quarter=4") ::
+ Row("country=US/quarter=1") ::
+ Row("country=US/quarter=2") ::
+ Row("country=US/quarter=3") ::
+ Row("country=US/quarter=4") :: Nil)
+
+ sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
+ checkAnswer(sql("SHOW PARTITIONS sales"),
+ Row("country=KR/quarter=2") ::
+ Row("country=KR/quarter=3") ::
+ Row("country=KR/quarter=4") ::
+ Row("country=US/quarter=2") ::
+ Row("country=US/quarter=3") ::
+ Row("country=US/quarter=4") :: Nil)
+
+ sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
+ sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
+ checkAnswer(sql("SHOW PARTITIONS sales"),
+ Row("country=KR/quarter=2") ::
+ Row("country=KR/quarter=3") ::
+ Row("country=US/quarter=2") ::
+ Row("country=US/quarter=4") :: Nil)
+
+ sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
+ checkAnswer(sql("SHOW PARTITIONS sales"),
+ Row("country=KR/quarter=3") :: Nil)
+
+ // According to the declarative partition spec definitions, this drops the union of target
+ // partitions without exceptions. Hive raises exceptions because it handles them sequentially.
+ sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
+ checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
+ }
+ }
+
+ test("SPARK-17732: Error handling for drop partitions by filter") {
+ withTable("sales") {
+ sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
+
+ val m = intercept[AnalysisException] {
+ sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
+ }.getMessage
+ assert(m.contains("unknown is not a valid partition column in table"))
+
+ val m2 = intercept[AnalysisException] {
+ sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
+ }.getMessage
+ assert(m2.contains("unknown is not a valid partition column in table"))
+
+ val m3 = intercept[AnalysisException] {
+ sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
+ }.getMessage
+ assert(m3.contains("'<=>' operator is not allowed in partition specification"))
+
+ val m4 = intercept[ParseException] {
+ sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
+ }.getMessage
+ assert(m4.contains("'<=>' operator is not allowed in partition specification"))
+
+ val m5 = intercept[ParseException] {
+ sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
+ }.getMessage
+ assert(m5.contains("Invalid partition filter specification"))
+
+ sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
+ val m6 = intercept[AnalysisException] {
+ sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
+ }.getMessage
+ // The query is not executed because `PARTITION (quarter <= '2')` is invalid.
+ checkAnswer(sql("SHOW PARTITIONS sales"),
+ Row("country=KR/quarter=3") :: Nil)
+ assert(m6.contains("There is no partition for (`quarter` <= '2')"))
+ }
+ }
+
+ test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
+ withTable("sales") {
+ sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
+
+ val m = intercept[ParseException] {
+ sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
+ }.getMessage()
+ assert(m.contains("Invalid partition filter specification"))
+ }
+ }
+
test("drop views") {
withTable("tab1") {
val tabName = "tab1"