aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
commit7f968867ff61c6b1a007874ee7e3a7421d94d373 (patch)
treec1d3cc77fc215789323ddcac25b6e3c571d3370f /sql/hive
parent68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (diff)
downloadspark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.gz
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.bz2
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.zip
[SPARK-16119][SQL] Support PURGE option to drop table / partition.
This option is used by Hive to directly delete the files instead of moving them to the trash. This is needed in certain configurations where moving the files does not work. For non-Hive tables and partitions, Spark already behaves as if the PURGE option was set, so there's no need to do anything. Hive support for PURGE was added in 0.14 (for tables) and 1.2 (for partitions), so the code reflects that: trying to use the option with older versions of Hive will cause an exception to be thrown. The change is a little noisier than I would like, because of the code to propagate the new flag through all the interfaces and implementations; the main changes are in the parser and in HiveShim, aside from the tests (DDLCommandSuite, VersionsSuite). Tested by running sql and catalyst unit tests, plus VersionsSuite which has been updated to test the version-specific behavior. I also ran an internal test suite that uses PURGE and would not pass previously. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #13831 from vanzin/SPARK-16119.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala100
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala29
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala4
9 files changed, 146 insertions, 19 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index b8bc9ab900..cf2b92fb89 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -192,9 +192,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
override def dropTable(
db: String,
table: String,
- ignoreIfNotExists: Boolean): Unit = withClient {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withClient {
requireDbExists(db)
- client.dropTable(db, table, ignoreIfNotExists)
+ client.dropTable(db, table, ignoreIfNotExists, purge)
}
override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
@@ -295,9 +296,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
db: String,
table: String,
parts: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = withClient {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts, ignoreIfNotExists)
+ client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
}
override def renamePartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 5f89696918..6f009d714b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -80,7 +80,7 @@ private[hive] trait HiveClient {
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/** Drop the specified table. */
- def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+ def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
/** Alter a table whose name matches the one specified in `table`, assuming it exists. */
final def alterTable(table: CatalogTable): Unit = alterTable(table.identifier.table, table)
@@ -121,7 +121,8 @@ private[hive] trait HiveClient {
db: String,
table: String,
specs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
/**
* Rename one or many existing table partitions, assuming they exist.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 1c89d8c62a..7e0cef3e35 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -406,8 +406,9 @@ private[hive] class HiveClientImpl(
override def dropTable(
dbName: String,
tableName: String,
- ignoreIfNotExists: Boolean): Unit = withHiveState {
- client.dropTable(dbName, tableName, true, ignoreIfNotExists)
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withHiveState {
+ shim.dropTable(client, dbName, tableName, true, ignoreIfNotExists, purge)
}
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
@@ -429,7 +430,8 @@ private[hive] class HiveClientImpl(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = withHiveState {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
@@ -450,7 +452,7 @@ private[hive] class HiveClientImpl(
matchingParts.foreach { partition =>
try {
val deleteData = true
- client.dropPartition(db, table, partition, deleteData)
+ shim.dropPartition(client, db, table, partition, deleteData, purge)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9df4a26d55..41527fcd05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
-import java.lang.reflect.{Method, Modifier}
+import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegralType, StringType}
-
+import org.apache.spark.util.Utils
/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
@@ -129,6 +129,22 @@ private[client] sealed abstract class Shim {
def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
+ def dropTable(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ deleteData: Boolean,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
+
+ def dropPartition(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ part: JList[String],
+ deleteData: Boolean,
+ purge: Boolean): Unit
+
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = findMethod(klass, name, args: _*)
require(Modifier.isStatic(method.getModifiers()),
@@ -343,6 +359,32 @@ private[client] class Shim_v0_12 extends Shim with Logging {
dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
}
+ override def dropTable(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ deleteData: Boolean,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ if (purge) {
+ throw new UnsupportedOperationException("DROP TABLE ... PURGE")
+ }
+ hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
+ }
+
+ override def dropPartition(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ part: JList[String],
+ deleteData: Boolean,
+ purge: Boolean): Unit = {
+ if (purge) {
+ throw new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE")
+ }
+ hive.dropPartition(dbName, tableName, part, deleteData)
+ }
+
override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
"Please use Hive 0.13 or higher.")
@@ -599,6 +641,15 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
+ private lazy val dropTableMethod =
+ findMethod(
+ classOf[Hive],
+ "dropTable",
+ classOf[String],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
private lazy val getTimeVarMethod =
findMethod(
classOf[HiveConf],
@@ -643,6 +694,21 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}
+ override def dropTable(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ deleteData: Boolean,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ try {
+ dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
+ ignoreIfNotExists: JBoolean, purge: JBoolean)
+ } catch {
+ case e: InvocationTargetException => throw e.getCause()
+ }
+ }
+
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
getTimeVarMethod.invoke(
conf,
@@ -696,6 +762,19 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
JBoolean.TYPE,
JLong.TYPE)
+ private lazy val dropOptionsClass =
+ Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
+ private lazy val dropOptionsDeleteData = dropOptionsClass.getField("deleteData")
+ private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
+ private lazy val dropPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "dropPartition",
+ classOf[String],
+ classOf[String],
+ classOf[JList[String]],
+ dropOptionsClass)
+
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
@@ -710,4 +789,21 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
0L: JLong)
}
+ override def dropPartition(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ part: JList[String],
+ deleteData: Boolean,
+ purge: Boolean): Unit = {
+ val dropOptions = dropOptionsClass.newInstance().asInstanceOf[Object]
+ dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
+ dropOptionsPurge.setBoolean(dropOptions, purge)
+ try {
+ dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
+ } catch {
+ case e: InvocationTargetException => throw e.getCause()
+ }
+ }
+
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 15a5d79dcb..2762e0cdd5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -95,7 +95,8 @@ case class CreateHiveTableAsSelectCommand(
} catch {
case NonFatal(e) =>
// drop the created table.
- sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true)
+ sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
+ purge = false)
throw e
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 8dc756b938..6eeb67510c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -43,7 +43,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def afterAll(): Unit = {
try {
sessionState.catalog.dropTable(
- TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
+ TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true, purge = false)
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a5975cf483..b275ab17a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -165,7 +165,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tempTable"), ignoreIfNotExists = true)
+ TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 5b209acf0f..a972f61e25 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -249,7 +249,19 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: dropTable") {
- client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false)
+ val versionsWithoutPurge = versions.takeWhile(_ != "0.14")
+ // First try with the purge option set. This should fail if the version is < 0.14, in which
+ // case we check the version and try without it.
+ try {
+ client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
+ purge = true)
+ assert(!versionsWithoutPurge.contains(version))
+ } catch {
+ case _: UnsupportedOperationException =>
+ assert(versionsWithoutPurge.contains(version))
+ client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
+ purge = false)
+ }
assert(client.listTables("default") === Seq("src"))
}
@@ -366,7 +378,20 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: dropPartitions") {
val spec = Map("key1" -> "1", "key2" -> "3")
- client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true)
+ val versionsWithoutPurge = versions.takeWhile(_ != "1.2")
+ // Similar to dropTable; try with purge set, and if it fails, make sure we're running
+ // with a version that is older than the minimum (1.2 in this case).
+ try {
+ client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
+ purge = true)
+ assert(!versionsWithoutPurge.contains(version))
+ } catch {
+ case _: UnsupportedOperationException =>
+ assert(versionsWithoutPurge.contains(version))
+ client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
+ purge = false)
+ }
+
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index af8115cf9d..b9e98fc85f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
+ sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("overwriting") {
@@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
+ sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("self-join") {