aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-05-26 19:01:41 -0700
committerAndrew Or <andrew@databricks.com>2016-05-26 19:01:41 -0700
commit008a5377d57ce6692eca4a41539fb27978b58e01 (patch)
tree1ef7e874069c6b1536735d4f39511c3905c9e879 /sql
parent3ac2363d757cc9cebc627974f17ecda3a263efdf (diff)
downloadspark-008a5377d57ce6692eca4a41539fb27978b58e01.tar.gz
spark-008a5377d57ce6692eca4a41539fb27978b58e01.tar.bz2
spark-008a5377d57ce6692eca4a41539fb27978b58e01.zip
[SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2
## What changes were proposed in this pull request? Two more changes: (1) Fix truncate table for data source tables (only for cases without `PARTITION`) (2) Disallow truncating external tables or views ## How was this patch tested? `DDLSuite` Author: Andrew Or <andrew@databricks.com> Closes #13315 from andrewor14/truncate-table.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala78
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala34
2 files changed, 86 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bef4c9222c..e34beec33d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -285,41 +285,67 @@ case class TruncateTableCommand(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val catalog = sparkSession.sessionState.catalog
+ override def run(spark: SparkSession): Seq[Row] = {
+ val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.")
- } else if (catalog.isTemporaryTable(tableName)) {
+ }
+ if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'")
- } else {
- val locations = if (partitionSpec.isDefined) {
- catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
+ }
+ val table = catalog.getTableMetadata(tableName)
+ if (table.tableType == CatalogTableType.EXTERNAL) {
+ throw new AnalysisException(
+ s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'")
+ }
+ if (table.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
+ }
+ val isDatasourceTable = DDLUtils.isDatasourceTable(table)
+ if (isDatasourceTable && partitionSpec.isDefined) {
+ throw new AnalysisException(
+ s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
+ s"for tables created using the data sources API: '$tableName'")
+ }
+ if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
+ throw new AnalysisException(
+ s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
+ s"for tables that are not partitioned: '$tableName'")
+ }
+ val locations =
+ if (isDatasourceTable || table.partitionColumnNames.isEmpty) {
+ Seq(table.storage.locationUri)
} else {
- val table = catalog.getTableMetadata(tableName)
- if (table.partitionColumnNames.nonEmpty) {
- catalog.listPartitions(tableName).map(_.storage.locationUri)
- } else {
- Seq(table.storage.locationUri)
- }
+ catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
}
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- locations.foreach { location =>
- if (location.isDefined) {
- val path = new Path(location.get)
- try {
- val fs = path.getFileSystem(hadoopConf)
- fs.delete(path, true)
- fs.mkdirs(path)
- } catch {
- case NonFatal(e) =>
- throw new AnalysisException(
- s"Failed to truncate table '$tableName' when removing data of the path: $path " +
- s"because of ${e.toString}")
- }
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ locations.foreach { location =>
+ if (location.isDefined) {
+ val path = new Path(location.get)
+ try {
+ val fs = path.getFileSystem(hadoopConf)
+ fs.delete(path, true)
+ fs.mkdirs(path)
+ } catch {
+ case NonFatal(e) =>
+ throw new AnalysisException(
+ s"Failed to truncate table '$tableName' when removing data of the path: $path " +
+ s"because of ${e.toString}")
}
}
}
+ // After deleting the data, invalidate the table to make sure we don't keep around a stale
+ // file relation in the metastore cache.
+ spark.sessionState.invalidateTable(tableName.unquotedString)
+ // Also try to drop the contents of the table from the columnar cache
+ try {
+ spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
+ } catch {
+ case NonFatal(e) =>
+ log.warn(s"Exception when attempting to uncache table '$tableName'", e)
+ }
Seq.empty[Row]
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index bddd3f2119..6c038c7735 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private val escapedIdentifier = "`(.+)`".r
@@ -1109,4 +1110,37 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}
+
+ test("truncate table - datasource table") {
+ import testImplicits._
+ val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
+ data.write.saveAsTable("rectangles")
+ spark.catalog.cacheTable("rectangles")
+ assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with")
+ assume(spark.catalog.isCached("rectangles"), "bad test; table was not cached to begin with")
+ sql("TRUNCATE TABLE rectangles")
+ assert(spark.table("rectangles").collect().isEmpty)
+ assert(!spark.catalog.isCached("rectangles"))
+ // truncating partitioned data source tables is not supported
+ data.write.partitionBy("length").saveAsTable("rectangles2")
+ assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
+ assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+ }
+
+ test("truncate table - external table, temporary table, view (not allowed)") {
+ import testImplicits._
+ val path = Utils.createTempDir().getAbsolutePath
+ (1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
+ sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
+ sql(s"CREATE VIEW my_view AS SELECT 1")
+ assertUnsupported("TRUNCATE TABLE my_temp_tab")
+ assertUnsupported("TRUNCATE TABLE my_ext_tab")
+ assertUnsupported("TRUNCATE TABLE my_view")
+ }
+
+ test("truncate table - non-partitioned table (not allowed)") {
+ sql("CREATE TABLE my_tab (age INT, name STRING)")
+ assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
+ }
+
}