From 6b156e2fcf9c0c1ed0770a7ad9c54fa374760e17 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 4 Sep 2016 15:04:33 +0800 Subject: [SPARK-17324][SQL] Remove Direct Usage of HiveClient in InsertIntoHiveTable ### What changes were proposed in this pull request? This is another step to get rid of HiveClient from `HiveSessionState`. All the metastore interactions should be through `ExternalCatalog` interface. However, the existing implementation of `InsertIntoHiveTable ` still requires Hive clients. This PR is to remove HiveClient by moving the metastore interactions into `ExternalCatalog`. ### How was this patch tested? Existing test cases Author: gatorsmile Closes #14888 from gatorsmile/removeClientFromInsertIntoHiveTable. --- .../sql/catalyst/catalog/ExternalCatalog.scala | 20 +++++++++++++++-- .../sql/catalyst/catalog/InMemoryCatalog.scala | 25 ++++++++++++++++++++-- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +++--- 3 files changed, 44 insertions(+), 8 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index df72baaba2..dd93b467ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -121,8 +121,16 @@ abstract class ExternalCatalog { partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit + inheritTableSpecs: Boolean): Unit + + def loadDynamicPartitions( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean): Unit // -------------------------------------------------------------------------- // Partitions @@ -165,6 +173,14 @@ abstract class ExternalCatalog { def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition + /** + * Returns the specified partition or None if it does not exist. + */ + def getPartitionOption( + db: String, + table: String, + spec: TablePartitionSpec): Option[CatalogTablePartition] + /** * List the metadata of all partitions that belong to the specified table, assuming it exists. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 4e361a536d..3e31127118 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -305,11 +305,21 @@ class InMemoryCatalog( partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { + inheritTableSpecs: Boolean): Unit = { throw new UnsupportedOperationException("loadPartition is not implemented.") } + override def loadDynamicPartitions( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + replace: Boolean, + numDP: Int, + holdDDLTime: Boolean): Unit = { + throw new UnsupportedOperationException("loadDynamicPartitions is not implemented.") + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- @@ -444,6 +454,17 @@ class InMemoryCatalog( catalog(db).tables(table).partitions(spec) } + override def getPartitionOption( + db: String, + table: String, + spec: TablePartitionSpec): Option[CatalogTablePartition] = synchronized { + if (!partitionExists(db, table, spec)) { + None + } else { + Option(catalog(db).tables(table).partitions(spec)) + } + } + override def listPartitions( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 62d0da076b..e7132cd397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -309,14 +309,13 @@ class SessionCatalog( partition: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit = { + inheritTableSpecs: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime, - inheritTableSpecs, isSkewedStoreAsSubdir) + externalCatalog.loadPartition( + db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs) } def defaultTablePath(tableIdent: TableIdentifier): String = { -- cgit v1.2.3