diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-09-04 15:04:33 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-04 15:04:33 +0800 |
commit | 6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17 (patch) | |
tree | 593ff90402b847fe4ed225e961d4d34e506eb62b /sql/catalyst | |
parent | e9b58e9ef89a9118b6d5a466d10db8e30d61f850 (diff) | |
download | spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.tar.gz spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.tar.bz2 spark-6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17.zip |
[SPARK-17324][SQL] Remove Direct Usage of HiveClient in InsertIntoHiveTable
### What changes were proposed in this pull request?
This is another step to get rid of HiveClient from `HiveSessionState`. All the metastore interactions should be through `ExternalCatalog` interface. However, the existing implementation of `InsertIntoHiveTable ` still requires Hive clients. This PR is to remove HiveClient by moving the metastore interactions into `ExternalCatalog`.
### How was this patch tested?
Existing test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes #14888 from gatorsmile/removeClientFromInsertIntoHiveTable.
Diffstat (limited to 'sql/catalyst')
3 files changed, 44 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index df72baaba2..dd93b467ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -121,8 +121,16 @@ abstract class ExternalCatalog { partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit + inheritTableSpecs: Boolean): Unit + + def loadDynamicPartitions( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean): Unit // -------------------------------------------------------------------------- // Partitions @@ -166,6 +174,14 @@ abstract class ExternalCatalog { def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition /** + * Returns the specified partition or None if it does not exist. + */ + def getPartitionOption( + db: String, + table: String, + spec: TablePartitionSpec): Option[CatalogTablePartition] + + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * * A partial partition spec may optionally be provided to filter the partitions returned. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 4e361a536d..3e31127118 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -305,11 +305,21 @@ class InMemoryCatalog( partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { + inheritTableSpecs: Boolean): Unit = { throw new UnsupportedOperationException("loadPartition is not implemented.") } + override def loadDynamicPartitions( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean): Unit = { + throw new UnsupportedOperationException("loadDynamicPartitions is not implemented.") + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- @@ -444,6 +454,17 @@ class InMemoryCatalog( catalog(db).tables(table).partitions(spec) } + override def getPartitionOption( + db: String, + table: String, + spec: TablePartitionSpec): Option[CatalogTablePartition] = synchronized { + if (!partitionExists(db, table, spec)) { + None + } else { + Option(catalog(db).tables(table).partitions(spec)) + } + } + override def listPartitions( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 62d0da076b..e7132cd397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -309,14 +309,13 @@ class SessionCatalog( partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { + inheritTableSpecs: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime, - inheritTableSpecs, isSkewedStoreAsSubdir) + externalCatalog.loadPartition( + db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs) } def defaultTablePath(tableIdent: TableIdentifier): String = { |