aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-11-28 08:46:00 -0800
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-28 08:46:00 -0800
commitd31ff9b7caf4eba66724947b68f517072e6a011c (patch)
tree13c774c25351ba915d31346481da197f68512a92 /sql/core
parent38e29824d9a50464daa397c28e89610ed0aed4b6 (diff)
downloadspark-d31ff9b7caf4eba66724947b68f517072e6a011c.tar.gz
spark-d31ff9b7caf4eba66724947b68f517072e6a011c.tar.bz2
spark-d31ff9b7caf4eba66724947b68f517072e6a011c.zip
[SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators
## What changes were proposed in this pull request? https://github.com/apache/spark/pull/15704 will fail if we use int literal in `DROP PARTITION`, and we have reverted it in branch-2.1. This PR reverts it in master branch, and add a regression test for it, to make sure the master branch is healthy. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #16036 from cloud-fan/revert.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala14
5 files changed, 25 insertions, 59 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 0300bfe1ec..5f89a229d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -832,7 +832,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
- ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
+ ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index d80b000bcc..0f126d0200 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
-import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -420,55 +419,27 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[Expression],
+ specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean)
- extends RunnableCommand with PredicateHelper {
-
- private def isRangeComparison(expr: Expression): Boolean = {
- expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
- }
+ extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
- val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
- specs.foreach { expr =>
- expr.references.foreach { attr =>
- if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
- throw new AnalysisException(s"${attr.name} is not a valid partition column " +
- s"in table ${table.identifier.quotedString}.")
- }
- }
+ val normalizedSpecs = specs.map { spec =>
+ PartitioningUtils.normalizePartitionSpec(
+ spec,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver)
}
- if (specs.exists(isRangeComparison)) {
- val partitionSet = specs.flatMap { spec =>
- val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
- if (partitions.isEmpty && !ifExists) {
- throw new AnalysisException(s"There is no partition for ${spec.sql}")
- }
- partitions
- }.distinct
- catalog.dropPartitions(
- table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
- } else {
- val normalizedSpecs = specs.map { expr =>
- val spec = splitConjunctivePredicates(expr).map {
- case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
- }.toMap
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- resolver)
- }
- catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
- }
+ catalog.dropPartitions(
+ table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e81512d1ab..4f19a2d00b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -215,14 +215,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (overwrite.enabled) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
- import org.apache.spark.sql.catalyst.expressions._
- val expressions = deletedPartitions.map { specs =>
- specs.map { case (key, value) =>
- EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
- }.reduceLeft(And)
- }.toSeq
AlterTableDropPartitionCommand(
- l.catalogTable.get.identifier, expressions,
+ l.catalogTable.get.identifier, deletedPartitions.toSeq,
ifExists = true, purge = true).run(t.sparkSession)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 057528bef5..d31e7aeb3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -21,7 +21,6 @@ import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -613,12 +612,8 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
- And(
- EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
- EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
- And(
- EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
- EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 10843e9ba5..a602d750d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1281,26 +1281,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val part2 = Map("a" -> "2", "b" -> "6")
val part3 = Map("a" -> "3", "b" -> "7")
val part4 = Map("a" -> "4", "b" -> "8")
+ val part5 = Map("a" -> "9", "b" -> "9")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
createTablePartition(catalog, part2, tableIdent)
createTablePartition(catalog, part3, tableIdent)
createTablePartition(catalog, part4, tableIdent)
+ createTablePartition(catalog, part5, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4))
+ Set(part1, part2, part3, part4, part5))
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
// basic drop partition
sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part5))
// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1314,10 +1316,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// partition to drop does not exist when using IF EXISTS
sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))
// partition spec in DROP PARTITION should be case insensitive by default
sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part5))
+
+ // use int literal as partition value for int type partition column
+ sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)")
assert(catalog.listPartitions(tableIdent).isEmpty)
}