aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-11-08 22:28:29 +0800
committerWenchen Fan <wenchen@databricks.com>2016-11-08 22:28:29 +0800
commit73feaa30ebfb62c81c7ce2c60ce2163611dd8852 (patch)
treef7053406a7f29d8851920d9acef96de3ccce4ee4
parent344dcad70173abcb348c68fdb0219960b5b06635 (diff)
downloadspark-73feaa30ebfb62c81c7ce2c60ce2163611dd8852.tar.gz
spark-73feaa30ebfb62c81c7ce2c60ce2163611dd8852.tar.bz2
spark-73feaa30ebfb62c81c7ce2c60ce2163611dd8852.zip
[SPARK-18346][SQL] TRUNCATE TABLE should fail if no partition is matched for the given non-partial partition spec
## What changes were proposed in this pull request? a follow up of https://github.com/apache/spark/pull/15688 ## How was this patch tested? updated test in `DDLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #15805 from cloud-fan/truncate.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala12
3 files changed, 30 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 3cfa639a2f..3a856fa0f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -324,38 +324,47 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
- val tableIdentwithDB = table.identifier.quotedString
+ val tableIdentWithDB = table.identifier.quotedString
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentwithDB")
+ s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables that are not partitioned: $tableIdentwithDB")
+ s"for tables that are not partitioned: $tableIdentWithDB")
}
if (partitionSpec.isDefined) {
DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
}
+
+ val partCols = table.partitionColumnNames
val locations =
- if (table.partitionColumnNames.isEmpty) {
+ if (partCols.isEmpty) {
Seq(table.storage.locationUri)
} else {
- // Here we diverge from Hive when the given partition spec contains all partition columns
- // but no partition is matched: Hive will throw an exception and we just do nothing.
val normalizedSpec = partitionSpec.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
- table.partitionColumnNames,
+ partCols,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}
- catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
+ val partLocations =
+ catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
+
+ // Fail if the partition spec is fully specified (not partial) and the partition does not
+ // exist.
+ for (spec <- partitionSpec if partLocations.isEmpty && spec.size == partCols.length) {
+ throw new NoSuchPartitionException(table.database, table.identifier.table, spec)
+ }
+
+ partLocations
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
@@ -368,7 +377,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
- s"Failed to truncate table $tableIdentwithDB when removing data of the path: $path " +
+ s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
@@ -381,7 +390,7 @@ case class TruncateTableCommand(
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
} catch {
case NonFatal(e) =>
- log.warn(s"Exception when attempting to uncache table $tableIdentwithDB", e)
+ log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e)
}
Seq.empty[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 864af8d578..df3a3c34c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1673,11 +1673,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())
- // do nothing if no partition is matched for the given non-partial partition spec
- // TODO: This behaviour is different from Hive, we should decide whether we need to follow
- // Hive's behaviour or stick with our existing behaviour later.
- sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
- assert(spark.table("partTable").count() == data.count())
+ // throw exception if no partition is matched for the given non-partial partition spec.
+ intercept[NoSuchPartitionException] {
+ sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
+ }
// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 4150e649be..0076a77868 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -23,11 +23,10 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -1149,11 +1148,10 @@ class HiveDDLSuite
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())
- // do nothing if no partition is matched for the given non-partial partition spec
- // TODO: This behaviour is different from Hive, we should decide whether we need to follow
- // Hive's behaviour or stick with our existing behaviour later.
- sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
- assert(spark.table("partTable").count() == data.count())
+ // throw exception if no partition is matched for the given non-partial partition spec.
+ intercept[NoSuchPartitionException] {
+ sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
+ }
// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {