From 37575115b98fdc9ebadb2ebcbcd9907a3af1076c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Apr 2016 14:17:36 -0700 Subject: [SPARK-14940][SQL] Move ExternalCatalog to own file ## What changes were proposed in this pull request? `interfaces.scala` was getting big. This just moves the biggest class in there to a new file for cleanliness. ## How was this patch tested? Just moving things around. Author: Andrew Or Closes #12721 from andrewor14/move-external-catalog. --- .../catalyst/analysis/NoSuchItemException.scala | 2 +- .../sql/catalyst/catalog/ExternalCatalog.scala | 185 +++++++++++++++++++++ .../sql/catalyst/catalog/InMemoryCatalog.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/catalyst/catalog/interface.scala | 169 +------------------ .../spark/sql/execution/command/commands.scala | 2 +- .../apache/spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/tables.scala | 5 +- .../spark/sql/execution/command/DDLSuite.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../apache/spark/sql/hive/client/HiveClient.scala | 17 +- .../spark/sql/hive/client/HiveClientImpl.scala | 8 +- 12 files changed, 210 insertions(+), 188 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 5e18316c94..11ef9e1160 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala new file mode 100644 index 0000000000..178ae6d7c2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.spark.sql.AnalysisException + + +/** + * Interface for the system catalog (of columns, partitions, tables, and databases). + * + * This is only used for non-temporary items, and implementations must be thread-safe as they + * can be accessed in multiple threads. This is an external catalog because it is expected to + * interact with external systems. + * + * Implementations should throw [[AnalysisException]] when table or database don't exist. + */ +abstract class ExternalCatalog { + import CatalogTypes.TablePartitionSpec + + protected def requireDbExists(db: String): Unit = { + if (!databaseExists(db)) { + throw new AnalysisException(s"Database '$db' does not exist") + } + } + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit + + def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit + + /** + * Alter a database whose name matches the one specified in `dbDefinition`, + * assuming the database exists. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterDatabase(dbDefinition: CatalogDatabase): Unit + + def getDatabase(db: String): CatalogDatabase + + def databaseExists(db: String): Boolean + + def listDatabases(): Seq[String] + + def listDatabases(pattern: String): Seq[String] + + def setCurrentDatabase(db: String): Unit + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit + + def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit + + def renameTable(db: String, oldName: String, newName: String): Unit + + /** + * Alter a table whose name that matches the one specified in `tableDefinition`, + * assuming the table exists. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterTable(db: String, tableDefinition: CatalogTable): Unit + + def getTable(db: String, table: String): CatalogTable + + def getTableOption(db: String, table: String): Option[CatalogTable] + + def tableExists(db: String, table: String): Boolean + + def listTables(db: String): Seq[String] + + def listTables(db: String, pattern: String): Seq[String] + + def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit + + def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit + + def dropPartitions( + db: String, + table: String, + parts: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit + + /** + * Override the specs of one or many existing table partitions, assuming they exist. + * This assumes index i of `specs` corresponds to index i of `newSpecs`. + */ + def renamePartitions( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit + + /** + * Alter one or many table partitions whose specs that match those specified in `parts`, + * assuming the partitions exist. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition]): Unit + + def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition + + /** + * List the metadata of all partitions that belong to the specified table, assuming it exists. + * + * A partial partition spec may optionally be provided to filter the partitions returned. + * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), + * then a partial spec of (a='1') will return the first two only. + * @param db database name + * @param table table name + * @param partialSpec partition spec + */ + def listPartitions( + db: String, + table: String, + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + def createFunction(db: String, funcDefinition: CatalogFunction): Unit + + def dropFunction(db: String, funcName: String): Unit + + def renameFunction(db: String, oldName: String, newName: String): Unit + + def getFunction(db: String, funcName: String): CatalogFunction + + def functionExists(db: String, funcName: String): Boolean + + def listFunctions(db: String, pattern: String): Seq[String] + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 28a67067d0..60eb7329f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils * All public methods should be synchronized for thread-safety. */ class InMemoryCatalog extends ExternalCatalog { - import ExternalCatalog._ + import CatalogTypes.TablePartitionSpec private class TableDesc(var table: CatalogTable) { val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 91d35de790..d7fd54308a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -45,7 +45,7 @@ class SessionCatalog( functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, conf: CatalystConf) extends Logging { - import ExternalCatalog._ + import CatalogTypes.TablePartitionSpec def this( externalCatalog: ExternalCatalog, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d1e2b3f664..5efaf8f201 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -26,171 +26,6 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} -/** - * Interface for the system catalog (of columns, partitions, tables, and databases). - * - * This is only used for non-temporary items, and implementations must be thread-safe as they - * can be accessed in multiple threads. This is an external catalog because it is expected to - * interact with external systems. - * - * Implementations should throw [[AnalysisException]] when table or database don't exist. - */ -abstract class ExternalCatalog { - import ExternalCatalog._ - - protected def requireDbExists(db: String): Unit = { - if (!databaseExists(db)) { - throw new AnalysisException(s"Database '$db' does not exist") - } - } - - // -------------------------------------------------------------------------- - // Databases - // -------------------------------------------------------------------------- - - def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit - - def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit - - /** - * Alter a database whose name matches the one specified in `dbDefinition`, - * assuming the database exists. - * - * Note: If the underlying implementation does not support altering a certain field, - * this becomes a no-op. - */ - def alterDatabase(dbDefinition: CatalogDatabase): Unit - - def getDatabase(db: String): CatalogDatabase - - def databaseExists(db: String): Boolean - - def listDatabases(): Seq[String] - - def listDatabases(pattern: String): Seq[String] - - def setCurrentDatabase(db: String): Unit - - // -------------------------------------------------------------------------- - // Tables - // -------------------------------------------------------------------------- - - def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit - - def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit - - def renameTable(db: String, oldName: String, newName: String): Unit - - /** - * Alter a table whose name that matches the one specified in `tableDefinition`, - * assuming the table exists. - * - * Note: If the underlying implementation does not support altering a certain field, - * this becomes a no-op. - */ - def alterTable(db: String, tableDefinition: CatalogTable): Unit - - def getTable(db: String, table: String): CatalogTable - - def getTableOption(db: String, table: String): Option[CatalogTable] - - def tableExists(db: String, table: String): Boolean - - def listTables(db: String): Seq[String] - - def listTables(db: String, pattern: String): Seq[String] - - def loadTable( - db: String, - table: String, - loadPath: String, - isOverwrite: Boolean, - holdDDLTime: Boolean): Unit - - def loadPartition( - db: String, - table: String, - loadPath: String, - partition: TablePartitionSpec, - isOverwrite: Boolean, - holdDDLTime: Boolean, - inheritTableSpecs: Boolean, - isSkewedStoreAsSubdir: Boolean): Unit - - // -------------------------------------------------------------------------- - // Partitions - // -------------------------------------------------------------------------- - - def createPartitions( - db: String, - table: String, - parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit - - def dropPartitions( - db: String, - table: String, - parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit - - /** - * Override the specs of one or many existing table partitions, assuming they exist. - * This assumes index i of `specs` corresponds to index i of `newSpecs`. - */ - def renamePartitions( - db: String, - table: String, - specs: Seq[TablePartitionSpec], - newSpecs: Seq[TablePartitionSpec]): Unit - - /** - * Alter one or many table partitions whose specs that match those specified in `parts`, - * assuming the partitions exist. - * - * Note: If the underlying implementation does not support altering a certain field, - * this becomes a no-op. - */ - def alterPartitions( - db: String, - table: String, - parts: Seq[CatalogTablePartition]): Unit - - def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition - - /** - * List the metadata of all partitions that belong to the specified table, assuming it exists. - * - * A partial partition spec may optionally be provided to filter the partitions returned. - * For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'), - * then a partial spec of (a='1') will return the first two only. - * @param db database name - * @param table table name - * @param partialSpec partition spec - */ - def listPartitions( - db: String, - table: String, - partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] - - // -------------------------------------------------------------------------- - // Functions - // -------------------------------------------------------------------------- - - def createFunction(db: String, funcDefinition: CatalogFunction): Unit - - def dropFunction(db: String, funcName: String): Unit - - def renameFunction(db: String, oldName: String, newName: String): Unit - - def getFunction(db: String, funcName: String): CatalogFunction - - def functionExists(db: String, funcName: String): Boolean - - def listFunctions(db: String, pattern: String): Seq[String] - -} - - /** * A function defined in the catalog. * @@ -235,7 +70,7 @@ case class CatalogColumn( * @param storage storage format of the partition */ case class CatalogTablePartition( - spec: ExternalCatalog.TablePartitionSpec, + spec: CatalogTypes.TablePartitionSpec, storage: CatalogStorageFormat) @@ -316,7 +151,7 @@ case class CatalogDatabase( properties: Map[String, String]) -object ExternalCatalog { +object CatalogTypes { /** * Specifications of a table partition. Mapping column name to column value. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 855e7e2fe3..5f9287b3b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 4a9a1603d0..44647116b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f38e260bc9..6078918316 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} @@ -156,7 +157,7 @@ case class LoadData( path: String, isLocal: Boolean, isOverwrite: Boolean, - partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand { + partition: Option[TablePartitionSpec]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 58330c49c7..a9a9bf76be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.test.SharedSQLContext class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 313093818f..ee048b2588 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.hive.client.HiveClient * All public methods must be synchronized for thread-safety. */ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging { - import ExternalCatalog._ + import CatalogTypes.TablePartitionSpec // Exceptions thrown by the hive client that we would like to wrap private val clientExceptions = Set( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ef08a39c17..b224664050 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -21,6 +21,7 @@ import java.io.PrintStream import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -119,7 +120,7 @@ private[hive] trait HiveClient { def dropPartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec], + specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit /** @@ -128,8 +129,8 @@ private[hive] trait HiveClient { def renamePartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec], - newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit /** * Alter one or more table partitions whose specs match the ones specified in `newParts`, @@ -144,7 +145,7 @@ private[hive] trait HiveClient { final def getPartition( dbName: String, tableName: String, - spec: ExternalCatalog.TablePartitionSpec): CatalogTablePartition = { + spec: TablePartitionSpec): CatalogTablePartition = { getPartitionOption(dbName, tableName, spec).getOrElse { throw new NoSuchPartitionException(dbName, tableName, spec) } @@ -154,14 +155,14 @@ private[hive] trait HiveClient { final def getPartitionOption( db: String, table: String, - spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = { + spec: TablePartitionSpec): Option[CatalogTablePartition] = { getPartitionOption(getTable(db, table), spec) } /** Returns the specified partition or None if it does not exist. */ def getPartitionOption( table: CatalogTable, - spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] + spec: TablePartitionSpec): Option[CatalogTablePartition] /** * Returns the partitions for the given table that match the supplied partition spec. @@ -170,7 +171,7 @@ private[hive] trait HiveClient { final def getPartitions( db: String, table: String, - partialSpec: Option[ExternalCatalog.TablePartitionSpec]): Seq[CatalogTablePartition] = { + partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = { getPartitions(getTable(db, table), partialSpec) } @@ -180,7 +181,7 @@ private[hive] trait HiveClient { */ def getPartitions( table: CatalogTable, - partialSpec: Option[ExternalCatalog.TablePartitionSpec] = None): Seq[CatalogTablePartition] + partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ def getPartitionsByFilter( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d651791f9c..c98eaa0d15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -375,7 +375,7 @@ private[hive] class HiveClientImpl( override def dropPartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec], + specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) @@ -399,8 +399,8 @@ private[hive] class HiveClientImpl( override def renamePartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec], - newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState { + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) val hiveTable = toHiveTable(catalogTable) -- cgit v1.2.3