aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-04 15:04:33 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-04 15:04:33 +0800
commit6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17 (patch)
tree593ff90402b847fe4ed225e961d4d34e506eb62b /sql/catalyst
parente9b58e9ef89a9118b6d5a466d10db8e30d61f850 (diff)
downloadspark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.tar.gz
spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.tar.bz2
spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.zip
[SPARK-17324][SQL] Remove Direct Usage of HiveClient in InsertIntoHiveTable
### What changes were proposed in this pull request? This is another step to get rid of HiveClient from `HiveSessionState`. All the metastore interactions should be through `ExternalCatalog` interface. However, the existing implementation of `InsertIntoHiveTable ` still requires Hive clients. This PR is to remove HiveClient by moving the metastore interactions into `ExternalCatalog`. ### How was this patch tested? Existing test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14888 from gatorsmile/removeClientFromInsertIntoHiveTable.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala25
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala7
3 files changed, 44 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index df72baaba2..dd93b467ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -121,8 +121,16 @@ abstract class ExternalCatalog {
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit
+ inheritTableSpecs: Boolean): Unit
+
+ def loadDynamicPartitions(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ replace: Boolean,
+ numDP: Int,
+ holdDDLTime: Boolean): Unit
// --------------------------------------------------------------------------
// Partitions
@@ -166,6 +174,14 @@ abstract class ExternalCatalog {
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
/**
+ * Returns the specified partition or None if it does not exist.
+ */
+ def getPartitionOption(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): Option[CatalogTablePartition]
+
+ /**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 4e361a536d..3e31127118 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -305,11 +305,21 @@ class InMemoryCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = {
+ inheritTableSpecs: Boolean): Unit = {
throw new UnsupportedOperationException("loadPartition is not implemented.")
}
+ override def loadDynamicPartitions(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ replace: Boolean,
+ numDP: Int,
+ holdDDLTime: Boolean): Unit = {
+ throw new UnsupportedOperationException("loadDynamicPartitions is not implemented.")
+ }
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
@@ -444,6 +454,17 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions(spec)
}
+ override def getPartitionOption(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): Option[CatalogTablePartition] = synchronized {
+ if (!partitionExists(db, table, spec)) {
+ None
+ } else {
+ Option(catalog(db).tables(table).partitions(spec))
+ }
+ }
+
override def listPartitions(
db: String,
table: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 62d0da076b..e7132cd397 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -309,14 +309,13 @@ class SessionCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
- inheritTableSpecs: Boolean,
- isSkewedStoreAsSubdir: Boolean): Unit = {
+ inheritTableSpecs: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
- externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime,
- inheritTableSpecs, isSkewedStoreAsSubdir)
+ externalCatalog.loadPartition(
+ db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
}
def defaultTablePath(tableIdent: TableIdentifier): String = {