diff options
author | Andrew Or <andrew@databricks.com> | 2016-02-21 15:00:24 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-21 15:00:24 -0800 |
commit | 6c3832b26e119626205732b8fd03c8f5ba986896 (patch) | |
tree | c23d83055b66647662414f4a5f835ec30efbe64f /sql/hive | |
parent | 7eb83fefd19e137d80a23b5174b66b14831c291a (diff) | |
download | spark-6c3832b26e119626205732b8fd03c8f5ba986896.tar.gz spark-6c3832b26e119626205732b8fd03c8f5ba986896.tar.bz2 spark-6c3832b26e119626205732b8fd03c8f5ba986896.zip |
[SPARK-13080][SQL] Implement new Catalog API using Hive
## What changes were proposed in this pull request?
This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation.
*Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor.
*Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy.
The new class hierarchy is as follows:
```
org.apache.spark.sql.catalyst.catalog.Catalog
- org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
- org.apache.spark.sql.hive.HiveCatalog
```
Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release.
## How was the this patch tested?
All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases.
Author: Andrew Or <andrew@databricks.com>
Author: Reynold Xin <rxin@databricks.com>
Closes #11293 from rxin/hive-catalog.
Diffstat (limited to 'sql/hive')
15 files changed, 1031 insertions, 460 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala new file mode 100644 index 0000000000..21b9cfb820 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.util.control.NonFatal + +import org.apache.hadoop.hive.ql.metadata.HiveException +import org.apache.thrift.TException + +import org.apache.spark.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NoSuchItemException +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.client.HiveClient + + +/** + * A persistent implementation of the system catalog using Hive. + * All public methods must be synchronized for thread-safety. + */ +private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging { + import Catalog._ + + // Exceptions thrown by the hive client that we would like to wrap + private val clientExceptions = Set( + classOf[HiveException].getCanonicalName, + classOf[TException].getCanonicalName) + + /** + * Whether this is an exception thrown by the hive client that should be wrapped. + * + * Due to classloader isolation issues, pattern matching won't work here so we need + * to compare the canonical names of the exceptions, which we assume to be stable. + */ + private def isClientException(e: Throwable): Boolean = { + var temp: Class[_] = e.getClass + var found = false + while (temp != null && !found) { + found = clientExceptions.contains(temp.getCanonicalName) + temp = temp.getSuperclass + } + found + } + + /** + * Run some code involving `client` in a [[synchronized]] block and wrap certain + * exceptions thrown in the process in [[AnalysisException]]. + */ + private def withClient[T](body: => T): T = synchronized { + try { + body + } catch { + case e: NoSuchItemException => + throw new AnalysisException(e.getMessage) + case NonFatal(e) if isClientException(e) => + throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage) + } + } + + private def requireDbMatches(db: String, table: CatalogTable): Unit = { + if (table.specifiedDatabase != Some(db)) { + throw new AnalysisException( + s"Provided database $db does not much the one specified in the " + + s"table definition (${table.specifiedDatabase.getOrElse("n/a")})") + } + } + + private def requireTableExists(db: String, table: String): Unit = { + withClient { getTable(db, table) } + } + + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + override def createDatabase( + dbDefinition: CatalogDatabase, + ignoreIfExists: Boolean): Unit = withClient { + client.createDatabase(dbDefinition, ignoreIfExists) + } + + override def dropDatabase( + db: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = withClient { + client.dropDatabase(db, ignoreIfNotExists, cascade) + } + + /** + * Alter a database whose name matches the one specified in `dbDefinition`, + * assuming the database exists. + * + * Note: As of now, this only supports altering database properties! + */ + override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { + val existingDb = getDatabase(dbDefinition.name) + if (existingDb.properties == dbDefinition.properties) { + logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + + s"the provided database properties are the same as the old ones. Hive does not " + + s"currently support altering other database fields.") + } + client.alterDatabase(dbDefinition) + } + + override def getDatabase(db: String): CatalogDatabase = withClient { + client.getDatabase(db) + } + + override def databaseExists(db: String): Boolean = withClient { + client.getDatabaseOption(db).isDefined + } + + override def listDatabases(): Seq[String] = withClient { + client.listDatabases("*") + } + + override def listDatabases(pattern: String): Seq[String] = withClient { + client.listDatabases(pattern) + } + + override def setCurrentDatabase(db: String): Unit = withClient { + client.setCurrentDatabase(db) + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + override def createTable( + db: String, + tableDefinition: CatalogTable, + ignoreIfExists: Boolean): Unit = withClient { + requireDbExists(db) + requireDbMatches(db, tableDefinition) + client.createTable(tableDefinition, ignoreIfExists) + } + + override def dropTable( + db: String, + table: String, + ignoreIfNotExists: Boolean): Unit = withClient { + requireDbExists(db) + client.dropTable(db, table, ignoreIfNotExists) + } + + override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { + val newTable = client.getTable(db, oldName).copy(name = newName) + client.alterTable(oldName, newTable) + } + + /** + * Alter a table whose name that matches the one specified in `tableDefinition`, + * assuming the table exists. + * + * Note: As of now, this only supports altering table properties, serde properties, + * and num buckets! + */ + override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient { + requireDbMatches(db, tableDefinition) + requireTableExists(db, tableDefinition.name) + client.alterTable(tableDefinition) + } + + override def getTable(db: String, table: String): CatalogTable = withClient { + client.getTable(db, table) + } + + override def listTables(db: String): Seq[String] = withClient { + requireDbExists(db) + client.listTables(db) + } + + override def listTables(db: String, pattern: String): Seq[String] = withClient { + requireDbExists(db) + client.listTables(db, pattern) + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + override def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = withClient { + requireTableExists(db, table) + client.createPartitions(db, table, parts, ignoreIfExists) + } + + override def dropPartitions( + db: String, + table: String, + parts: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit = withClient { + requireTableExists(db, table) + // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we + // need to implement it here ourselves. This is currently somewhat expensive because + // we make multiple synchronous calls to Hive for each partition we want to drop. + val partsToDrop = + if (ignoreIfNotExists) { + parts.filter { spec => + try { + getPartition(db, table, spec) + true + } catch { + // Filter out the partitions that do not actually exist + case _: AnalysisException => false + } + } + } else { + parts + } + if (partsToDrop.nonEmpty) { + client.dropPartitions(db, table, partsToDrop) + } + } + + override def renamePartitions( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = withClient { + client.renamePartitions(db, table, specs, newSpecs) + } + + override def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit = withClient { + client.alterPartitions(db, table, newParts) + } + + override def getPartition( + db: String, + table: String, + spec: TablePartitionSpec): CatalogTablePartition = withClient { + client.getPartition(db, table, spec) + } + + override def listPartitions( + db: String, + table: String): Seq[CatalogTablePartition] = withClient { + client.getAllPartitions(db, table) + } + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + override def createFunction( + db: String, + funcDefinition: CatalogFunction): Unit = withClient { + client.createFunction(db, funcDefinition) + } + + override def dropFunction(db: String, name: String): Unit = withClient { + client.dropFunction(db, name) + } + + override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient { + client.renameFunction(db, oldName, newName) + } + + override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient { + client.alterFunction(db, funcDefinition) + } + + override def getFunction(db: String, funcName: String): CatalogFunction = withClient { + client.getFunction(db, funcName) + } + + override def listFunctions(db: String, pattern: String): Seq[String] = withClient { + client.listFunctions(db, pattern) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c222b006a0..3788736fd1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,15 +25,16 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.Warehouse +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse} import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.metadata._ +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.Logging import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ @@ -96,6 +97,8 @@ private[hive] object HiveSerDe { } } + +// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) extends Catalog with Logging { @@ -107,16 +110,16 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) - private def getQualifiedTableName(tableIdent: TableIdentifier) = { + private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, tableIdent.table.toLowerCase) } - private def getQualifiedTableName(hiveTable: HiveTable) = { + private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, - hiveTable.name.toLowerCase) + t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, + t.name.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -175,7 +178,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... - val options = table.serdeProperties + val options = table.storage.serdeProperties val resolvedRelation = ResolvedDataSource( @@ -276,53 +279,54 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val tableType = if (isExternal) { tableProperties.put("EXTERNAL", "TRUE") - ExternalTable + CatalogTableType.EXTERNAL_TABLE } else { tableProperties.put("EXTERNAL", "FALSE") - ManagedTable + CatalogTableType.MANAGED_TABLE } val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf) val dataSource = ResolvedDataSource( hive, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options) - def newSparkSQLSpecificMetastoreTable(): HiveTable = { - HiveTable( + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { + CatalogTable( specifiedDatabase = Option(dbName), name = tblName, - schema = Nil, - partitionColumns = Nil, tableType = tableType, - properties = tableProperties.toMap, - serdeProperties = options) + schema = Nil, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = options + ), + properties = tableProperties.toMap) } - def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = { - def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = { - schema.map { field => - HiveColumn( - name = field.name, - hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), - comment = "") - } - } - + def newHiveCompatibleMetastoreTable( + relation: HadoopFsRelation, + serde: HiveSerDe): CatalogTable = { assert(partitionColumns.isEmpty) assert(relation.partitionColumns.isEmpty) - HiveTable( + CatalogTable( specifiedDatabase = Option(dbName), name = tblName, - schema = schemaToHiveColumn(relation.schema), - partitionColumns = Nil, tableType = tableType, + storage = CatalogStorageFormat( + locationUri = Some(relation.paths.head), + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde, + serdeProperties = options + ), + schema = relation.schema.map { f => + CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType)) + }, properties = tableProperties.toMap, - serdeProperties = options, - location = Some(relation.paths.head), - viewText = None, // TODO We need to place the SQL string here. - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde) + viewText = None) // TODO: We need to place the SQL string here } // TODO: Support persisting partitioned data source relations in Hive compatible format @@ -379,7 +383,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // specific way. try { logInfo(message) - client.createTable(table) + client.createTable(table, ignoreIfExists = false) } catch { case throwable: Throwable => val warningMessage = @@ -387,20 +391,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte s"it into Hive metastore in Spark SQL specific format." logWarning(warningMessage, throwable) val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable() - client.createTable(sparkSqlSpecificTable) + client.createTable(sparkSqlSpecificTable, ignoreIfExists = false) } case (None, message) => logWarning(message) val hiveTable = newSparkSQLSpecificMetastoreTable() - client.createTable(hiveTable) + client.createTable(hiveTable, ignoreIfExists = false) } } def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - new Path(new Path(client.getDatabase(dbName).location), tblName).toString + new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString } override def tableExists(tableIdent: TableIdentifier): Boolean = { @@ -420,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) - } else if (table.tableType == VirtualView) { + } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) alias match { // because hive use things like `_c0` to build the expanded text @@ -429,7 +433,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText)) } } else { - MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive) + MetastoreRelation( + qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive) } } @@ -602,16 +607,14 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val schema = if (table.schema.nonEmpty) { table.schema } else { - child.output.map { - attr => new HiveColumn( - attr.name, - HiveMetastoreTypes.toMetastoreType(attr.dataType), null) + child.output.map { a => + CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable) } } val desc = table.copy(schema = schema) - if (hive.convertCTAS && table.serde.isEmpty) { + if (hive.convertCTAS && table.storage.serde.isEmpty) { // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). if (table.specifiedDatabase.isDefined) { @@ -632,9 +635,9 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte child ) } else { - val desc = if (table.serde.isEmpty) { + val desc = if (table.storage.serde.isEmpty) { // add default serde - table.copy( + table.withNewStorage( serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) } else { table @@ -744,10 +747,13 @@ private[hive] case class InsertIntoHiveTable( } } -private[hive] case class MetastoreRelation - (databaseName: String, tableName: String, alias: Option[String]) - (val table: HiveTable) - (@transient private val sqlContext: SQLContext) +private[hive] case class MetastoreRelation( + databaseName: String, + tableName: String, + alias: Option[String]) + (val table: CatalogTable, + @transient private val client: HiveClient, + @transient private val sqlContext: SQLContext) extends LeafNode with MultiInstanceRelation with FileRelation { override def equals(other: Any): Boolean = other match { @@ -765,7 +771,12 @@ private[hive] case class MetastoreRelation override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil - @transient val hiveQlTable: Table = { + private def toHiveColumn(c: CatalogColumn): FieldSchema = { + new FieldSchema(c.name, c.dataType, c.comment.orNull) + } + + // TODO: merge this with HiveClientImpl#toHiveTable + @transient val hiveQlTable: HiveTable = { // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() @@ -776,27 +787,31 @@ private[hive] case class MetastoreRelation tTable.setParameters(tableParameters) table.properties.foreach { case (k, v) => tableParameters.put(k, v) } - tTable.setTableType(table.tableType.name) + tTable.setTableType(table.tableType match { + case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString + case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString + case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString + case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString + }) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tTable.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - tTable.setPartitionKeys( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) + sd.setCols(table.schema.map(toHiveColumn).asJava) + tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava) - table.location.foreach(sd.setLocation) - table.inputFormat.foreach(sd.setInputFormat) - table.outputFormat.foreach(sd.setOutputFormat) + table.storage.locationUri.foreach(sd.setLocation) + table.storage.inputFormat.foreach(sd.setInputFormat) + table.storage.outputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - table.serde.foreach(serdeInfo.setSerializationLib) + table.storage.serde.foreach(serdeInfo.setSerializationLib) sd.setSerdeInfo(serdeInfo) val serdeParameters = new java.util.HashMap[String, String]() - table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) - new Table(tTable) + new HiveTable(tTable) } @transient override lazy val statistics: Statistics = Statistics( @@ -821,11 +836,11 @@ private[hive] case class MetastoreRelation // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 - lazy val allPartitions = table.getAllPartitions + private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { - table.getPartitions(predicates) + client.getPartitionsByFilter(table, predicates) } else { allPartitions } @@ -834,23 +849,22 @@ private[hive] case class MetastoreRelation val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) - tPartition.setValues(p.values.asJava) + tPartition.setValues(p.spec.values.toList.asJava) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - - sd.setLocation(p.storage.location) - sd.setInputFormat(p.storage.inputFormat) - sd.setOutputFormat(p.storage.outputFormat) + sd.setCols(table.schema.map(toHiveColumn).asJava) + p.storage.locationUri.foreach(sd.setLocation) + p.storage.inputFormat.foreach(sd.setInputFormat) + p.storage.outputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo sd.setSerdeInfo(serdeInfo) // maps and lists should be set only after all elements are ready (see HIVE-7975) - serdeInfo.setSerializationLib(p.storage.serde) + p.storage.serde.foreach(serdeInfo.setSerializationLib) val serdeParameters = new java.util.HashMap[String, String]() - table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) @@ -877,10 +891,10 @@ private[hive] case class MetastoreRelation hiveQlTable.getMetadata ) - implicit class SchemaAttribute(f: HiveColumn) { + implicit class SchemaAttribute(f: CatalogColumn) { def toAttribute: AttributeReference = AttributeReference( f.name, - HiveMetastoreTypes.toDataType(f.hiveType), + HiveMetastoreTypes.toDataType(f.dataType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true )(qualifiers = Seq(alias.getOrElse(tableName))) @@ -901,19 +915,22 @@ private[hive] case class MetastoreRelation val columnOrdinals = AttributeMap(attributes.zipWithIndex) override def inputFiles: Array[String] = { - val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray + val partLocations = client + .getPartitionsByFilter(table, Nil) + .flatMap(_.storage.locationUri) + .toArray if (partLocations.nonEmpty) { partLocations } else { Array( - table.location.getOrElse( + table.storage.locationUri.getOrElse( sys.error(s"Could not get the location of ${table.qualifiedName}."))) } } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext) + MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 752c037a84..5801051353 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.Logging import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.ParseUtils._ @@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SparkQl import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper -import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.types._ import org.apache.spark.sql.AnalysisException @@ -55,7 +55,7 @@ private[hive] case object NativePlaceholder extends LogicalPlan { } private[hive] case class CreateTableAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean) extends UnaryNode with Command { @@ -63,14 +63,14 @@ private[hive] case class CreateTableAsSelect( override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && tableDesc.schema.nonEmpty && - tableDesc.serde.isDefined && - tableDesc.inputFormat.isDefined && - tableDesc.outputFormat.isDefined && + tableDesc.storage.serde.isDefined && + tableDesc.storage.inputFormat.isDefined && + tableDesc.storage.outputFormat.isDefined && childrenResolved } private[hive] case class CreateViewAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean, replace: Boolean, @@ -193,7 +193,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging view: ASTNode, viewNameParts: ASTNode, query: ASTNode, - schema: Seq[HiveColumn], + schema: Seq[CatalogColumn], properties: Map[String, String], allowExist: Boolean, replace: Boolean): CreateViewAsSelect = { @@ -201,18 +201,20 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val originalText = query.source - val tableDesc = HiveTable( + val tableDesc = CatalogTable( specifiedDatabase = dbName, name = viewName, + tableType = CatalogTableType.VIRTUAL_VIEW, schema = schema, - partitionColumns = Seq.empty[HiveColumn], + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map.empty[String, String] + ), properties = properties, - serdeProperties = Map[String, String](), - tableType = VirtualView, - location = None, - inputFormat = None, - outputFormat = None, - serde = None, + viewOriginalText = Some(originalText), viewText = Some(originalText)) // We need to keep the original SQL string so that if `spark.sql.nativeView` is @@ -314,8 +316,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val schema = maybeColumns.map { cols => // We can't specify column types when create view, so fill it with null first, and // update it after the schema has been resolved later. - nodeToColumns(cols, lowerCase = true).map(_.copy(hiveType = null)) - }.getOrElse(Seq.empty[HiveColumn]) + nodeToColumns(cols, lowerCase = true).map(_.copy(dataType = null)) + }.getOrElse(Seq.empty[CatalogColumn]) val properties = scala.collection.mutable.Map.empty[String, String] @@ -369,19 +371,23 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts) // TODO add bucket support - var tableDesc: HiveTable = HiveTable( + var tableDesc: CatalogTable = CatalogTable( specifiedDatabase = dbName, name = tblName, - schema = Seq.empty[HiveColumn], - partitionColumns = Seq.empty[HiveColumn], - properties = Map[String, String](), - serdeProperties = Map[String, String](), - tableType = if (externalTable.isDefined) ExternalTable else ManagedTable, - location = None, - inputFormat = None, - outputFormat = None, - serde = None, - viewText = None) + tableType = + if (externalTable.isDefined) { + CatalogTableType.EXTERNAL_TABLE + } else { + CatalogTableType.MANAGED_TABLE + }, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map.empty[String, String] + ), + schema = Seq.empty[CatalogColumn]) // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.) val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) @@ -392,9 +398,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) } - hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f))) - hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f))) - hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f))) + tableDesc = tableDesc.withNewStorage( + inputFormat = hiveSerDe.inputFormat.orElse(tableDesc.storage.inputFormat), + outputFormat = hiveSerDe.outputFormat.orElse(tableDesc.storage.outputFormat), + serde = hiveSerDe.serde.orElse(tableDesc.storage.serde)) children.collect { case list @ Token("TOK_TABCOLLIST", _) => @@ -440,13 +447,13 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging // TODO support the nullFormat case _ => assert(false) } - tableDesc = tableDesc.copy( - serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala) + tableDesc = tableDesc.withNewStorage( + serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams.asScala) case Token("TOK_TABLELOCATION", child :: Nil) => val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text)) - tableDesc = tableDesc.copy(location = Option(location)) + tableDesc = tableDesc.withNewStorage(locationUri = Option(location)) case Token("TOK_TABLESERIALIZER", child :: Nil) => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( serde = Option(unescapeSQLString(child.children.head.text))) if (child.numChildren == 2) { // This is based on the readProps(..) method in @@ -459,59 +466,59 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging .orNull (unescapeSQLString(prop), value) }.toMap - tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams) + tableDesc = tableDesc.withNewStorage( + serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams) } case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) => child.text.toLowerCase(Locale.ENGLISH) match { case "orc" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy( + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) } case "parquet" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy( + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) } case "rcfile" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy(serde = - Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( + serde = + Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) } case "textfile" => - tableDesc = tableDesc.copy( - inputFormat = - Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + tableDesc = tableDesc.withNewStorage( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) case "sequencefile" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) case "avro" => - tableDesc = tableDesc.copy( + tableDesc = tableDesc.withNewStorage( inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")) - if (tableDesc.serde.isEmpty) { - tableDesc = tableDesc.copy( + if (tableDesc.storage.serde.isEmpty) { + tableDesc = tableDesc.withNewStorage( serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) } @@ -522,23 +529,21 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging case Token("TOK_TABLESERIALIZER", Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) => - tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName))) + tableDesc = tableDesc.withNewStorage(serde = Option(unquoteString(serdeName))) otherProps match { case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil => - tableDesc = tableDesc.copy( - serdeProperties = tableDesc.serdeProperties ++ getProperties(list)) + tableDesc = tableDesc.withNewStorage( + serdeProperties = tableDesc.storage.serdeProperties ++ getProperties(list)) case _ => } case Token("TOK_TABLEPROPERTIES", list :: Nil) => tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list)) case list @ Token("TOK_TABLEFILEFORMAT", _) => - tableDesc = tableDesc.copy( - inputFormat = - Option(unescapeSQLString(list.children.head.text)), - outputFormat = - Option(unescapeSQLString(list.children(1).text))) + tableDesc = tableDesc.withNewStorage( + inputFormat = Option(unescapeSQLString(list.children.head.text)), + outputFormat = Option(unescapeSQLString(list.children(1).text))) case Token("TOK_STORAGEHANDLER", _) => throw new AnalysisException( "CREATE TABLE AS SELECT cannot be used for a non-native table") @@ -678,15 +683,15 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging // This is based the getColumns methods in // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java - protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[HiveColumn] = { + protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[CatalogColumn] = { node.children.map(_.children).collect { case Token(rawColName, Nil) :: colTypeNode :: comment => - val colName = if (!lowerCase) rawColName - else rawColName.toLowerCase - HiveColumn( - cleanIdentifier(colName), - nodeToTypeString(colTypeNode), - comment.headOption.map(n => unescapeSQLString(n.text)).orNull) + val colName = if (!lowerCase) rawColName else rawColName.toLowerCase + CatalogColumn( + name = cleanIdentifier(colName), + dataType = nodeToTypeString(colTypeNode), + nullable = true, + comment.headOption.map(n => unescapeSQLString(n.text))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index f681cc6704..6a0a089fd1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -18,67 +18,11 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream -import java.util.{Map => JMap} -import javax.annotation.Nullable -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression -private[hive] case class HiveDatabase(name: String, location: String) - -private[hive] abstract class TableType { val name: String } -private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" } -private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" } -private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" } -private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" } - -// TODO: Use this for Tables and Partitions -private[hive] case class HiveStorageDescriptor( - location: String, - inputFormat: String, - outputFormat: String, - serde: String, - serdeProperties: Map[String, String]) - -private[hive] case class HivePartition( - values: Seq[String], - storage: HiveStorageDescriptor) - -private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) -private[hive] case class HiveTable( - specifiedDatabase: Option[String], - name: String, - schema: Seq[HiveColumn], - partitionColumns: Seq[HiveColumn], - properties: Map[String, String], - serdeProperties: Map[String, String], - tableType: TableType, - location: Option[String] = None, - inputFormat: Option[String] = None, - outputFormat: Option[String] = None, - serde: Option[String] = None, - viewText: Option[String] = None) { - - @transient - private[client] var client: HiveClient = _ - - private[client] def withClient(ci: HiveClient): this.type = { - client = ci - this - } - - def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved")) - - def isPartitioned: Boolean = partitionColumns.nonEmpty - - def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) - - def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = - client.getPartitionsByFilter(this, predicates) - - // Hive does not support backticks when passing names to the client. - def qualifiedName: String = s"$database.$name" -} /** * An externally visible interface to the Hive client. This interface is shared across both the @@ -106,6 +50,9 @@ private[hive] trait HiveClient { /** Returns the names of all tables in the given database. */ def listTables(dbName: String): Seq[String] + /** Returns the names of tables in the given database that matches the given pattern. */ + def listTables(dbName: String, pattern: String): Seq[String] + /** Returns the name of the active database. */ def currentDatabase: String @@ -113,46 +60,133 @@ private[hive] trait HiveClient { def setCurrentDatabase(databaseName: String): Unit /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - def getDatabase(name: String): HiveDatabase = { - getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException) + final def getDatabase(name: String): CatalogDatabase = { + getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name)) } /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[HiveDatabase] + def getDatabaseOption(name: String): Option[CatalogDatabase] + + /** List the names of all the databases that match the specified pattern. */ + def listDatabases(pattern: String): Seq[String] /** Returns the specified table, or throws [[NoSuchTableException]]. */ - def getTable(dbName: String, tableName: String): HiveTable = { - getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException) + final def getTable(dbName: String, tableName: String): CatalogTable = { + getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName)) } - /** Returns the metadata for the specified table or None if it doens't exist. */ - def getTableOption(dbName: String, tableName: String): Option[HiveTable] + /** Returns the metadata for the specified table or None if it doesn't exist. */ + def getTableOption(dbName: String, tableName: String): Option[CatalogTable] /** Creates a view with the given metadata. */ - def createView(view: HiveTable): Unit + def createView(view: CatalogTable): Unit /** Updates the given view with new metadata. */ - def alertView(view: HiveTable): Unit + def alertView(view: CatalogTable): Unit /** Creates a table with the given metadata. */ - def createTable(table: HiveTable): Unit + def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit - /** Updates the given table with new metadata. */ - def alterTable(table: HiveTable): Unit + /** Drop the specified table. */ + def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit + + /** Alter a table whose name matches the one specified in `table`, assuming it exists. */ + final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table) + + /** Updates the given table with new metadata, optionally renaming the table. */ + def alterTable(tableName: String, table: CatalogTable): Unit /** Creates a new database with the given name. */ - def createDatabase(database: HiveDatabase): Unit + def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit + + /** + * Drop the specified database, if it exists. + * + * @param name database to drop + * @param ignoreIfNotExists if true, do not throw error if the database does not exist + * @param cascade whether to remove all associated objects such as tables and functions + */ + def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit + + /** + * Alter a database whose name matches the one specified in `database`, assuming it exists. + */ + def alterDatabase(database: CatalogDatabase): Unit + + /** + * Create one or many partitions in the given table. + */ + def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit + + /** + * Drop one or many partitions in the given table. + * + * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the + * partitions do not already exist. The seemingly relevant flag `ifExists` in + * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere. + */ + def dropPartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec]): Unit - /** Returns the specified paritition or None if it does not exist. */ + /** + * Rename one or many existing table partitions, assuming they exist. + */ + def renamePartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec], + newSpecs: Seq[Catalog.TablePartitionSpec]): Unit + + /** + * Alter one or more table partitions whose specs match the ones specified in `newParts`, + * assuming the partitions exist. + */ + def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit + + /** Returns the specified partition, or throws [[NoSuchPartitionException]]. */ + final def getPartition( + dbName: String, + tableName: String, + spec: Catalog.TablePartitionSpec): CatalogTablePartition = { + getPartitionOption(dbName, tableName, spec).getOrElse { + throw new NoSuchPartitionException(dbName, tableName, spec) + } + } + + /** Returns the specified partition or None if it does not exist. */ + final def getPartitionOption( + db: String, + table: String, + spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = { + getPartitionOption(getTable(db, table), spec) + } + + /** Returns the specified partition or None if it does not exist. */ def getPartitionOption( - hTable: HiveTable, - partitionSpec: JMap[String, String]): Option[HivePartition] + table: CatalogTable, + spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] + + /** Returns all partitions for the given table. */ + final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = { + getAllPartitions(getTable(db, table)) + } /** Returns all partitions for the given table. */ - def getAllPartitions(hTable: HiveTable): Seq[HivePartition] + def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] /** Returns partitions filtered by predicates for the given table. */ - def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition] + def getPartitionsByFilter( + table: CatalogTable, + predicates: Seq[Expression]): Seq[CatalogTablePartition] /** Loads a static partition into an existing table. */ def loadPartition( @@ -181,6 +215,29 @@ private[hive] trait HiveClient { holdDDLTime: Boolean, listBucketingEnabled: Boolean): Unit + /** Create a function in an existing database. */ + def createFunction(db: String, func: CatalogFunction): Unit + + /** Drop an existing function an the database. */ + def dropFunction(db: String, name: String): Unit + + /** Rename an existing function in the database. */ + def renameFunction(db: String, oldName: String, newName: String): Unit + + /** Alter a function whose name matches the one specified in `func`, assuming it exists. */ + def alterFunction(db: String, func: CatalogFunction): Unit + + /** Return an existing function in the database, assuming it exists. */ + final def getFunction(db: String, name: String): CatalogFunction = { + getFunctionOption(db, name).getOrElse(throw new NoSuchFunctionException(db, name)) + } + + /** Return an existing function in the database, or None if it doesn't exist. */ + def getFunctionOption(db: String, name: String): Option[CatalogFunction] + + /** Return the names of all functions that match the given pattern in the database. */ + def listFunctions(db: String, pattern: String): Seq[String] + /** Add a jar into class loader */ def addJar(path: String): Unit @@ -192,4 +249,5 @@ private[hive] trait HiveClient { /** Used for testing only. Removes all metadata from this instance of Hive. */ def reset(): Unit + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index cf1ff55c96..7a007d2acc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -18,24 +18,25 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} -import java.util.{Map => JMap} import scala.collection.JavaConverters._ import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HTableType} -import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema} -import org.apache.hadoop.hive.ql.{metadata, Driver} -import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri} +import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{Logging, SparkConf, SparkException} -import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.util.{CircularBuffer, Utils} @@ -234,167 +235,184 @@ private[hive] class HiveClientImpl( if (getDatabaseOption(databaseName).isDefined) { state.setCurrentDatabase(databaseName) } else { - throw new NoSuchDatabaseException + throw new NoSuchDatabaseException(databaseName) } } - override def createDatabase(database: HiveDatabase): Unit = withHiveState { + override def createDatabase( + database: CatalogDatabase, + ignoreIfExists: Boolean): Unit = withHiveState { client.createDatabase( - new Database( + new HiveDatabase( database.name, - "", - new File(database.location).toURI.toString, - new java.util.HashMap), - true) + database.description, + database.locationUri, + database.properties.asJava), + ignoreIfExists) } - override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState { + override def dropDatabase( + name: String, + ignoreIfNotExists: Boolean, + cascade: Boolean): Unit = withHiveState { + client.dropDatabase(name, true, ignoreIfNotExists, cascade) + } + + override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { + client.alterDatabase( + database.name, + new HiveDatabase( + database.name, + database.description, + database.locationUri, + database.properties.asJava)) + } + + override def getDatabaseOption(name: String): Option[CatalogDatabase] = withHiveState { Option(client.getDatabase(name)).map { d => - HiveDatabase( + CatalogDatabase( name = d.getName, - location = d.getLocationUri) + description = d.getDescription, + locationUri = d.getLocationUri, + properties = d.getParameters.asScala.toMap) } } + override def listDatabases(pattern: String): Seq[String] = withHiveState { + client.getDatabasesByPattern(pattern).asScala.toSeq + } + override def getTableOption( dbName: String, - tableName: String): Option[HiveTable] = withHiveState { - + tableName: String): Option[CatalogTable] = withHiveState { logDebug(s"Looking up $dbName.$tableName") - - val hiveTable = Option(client.getTable(dbName, tableName, false)) - val converted = hiveTable.map { h => - - HiveTable( - name = h.getTableName, + Option(client.getTable(dbName, tableName, false)).map { h => + CatalogTable( specifiedDatabase = Option(h.getDbName), - schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)), - partitionColumns = h.getPartCols.asScala.map(f => - HiveColumn(f.getName, f.getType, f.getComment)), - properties = h.getParameters.asScala.toMap, - serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap, + name = h.getTableName, tableType = h.getTableType match { - case HTableType.MANAGED_TABLE => ManagedTable - case HTableType.EXTERNAL_TABLE => ExternalTable - case HTableType.VIRTUAL_VIEW => VirtualView - case HTableType.INDEX_TABLE => IndexTable + case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE + case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE + case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE + case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW }, - location = shim.getDataLocation(h), - inputFormat = Option(h.getInputFormatClass).map(_.getName), - outputFormat = Option(h.getOutputFormatClass).map(_.getName), - serde = Option(h.getSerializationLib), - viewText = Option(h.getViewExpandedText)).withClient(this) + schema = h.getCols.asScala.map(fromHiveColumn), + partitionColumns = h.getPartCols.asScala.map(fromHiveColumn), + sortColumns = Seq(), + numBuckets = h.getNumBuckets, + createTime = h.getTTable.getCreateTime.toLong * 1000, + lastAccessTime = h.getLastAccessTime.toLong * 1000, + storage = CatalogStorageFormat( + locationUri = shim.getDataLocation(h), + inputFormat = Option(h.getInputFormatClass).map(_.getName), + outputFormat = Option(h.getOutputFormatClass).map(_.getName), + serde = Option(h.getSerializationLib), + serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap + ), + properties = h.getParameters.asScala.toMap, + viewOriginalText = Option(h.getViewOriginalText), + viewText = Option(h.getViewExpandedText)) } - converted } - private def toInputFormat(name: String) = - Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] - - private def toOutputFormat(name: String) = - Utils.classForName(name) - .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - - private def toQlTable(table: HiveTable): metadata.Table = { - val qlTable = new metadata.Table(table.database, table.name) - - qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - qlTable.setPartCols( - table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } - table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } - - // set owner - qlTable.setOwner(conf.getUser) - // set create time - qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - - table.location.foreach { loc => shim.setDataLocation(qlTable, loc) } - table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass) - table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass) - table.serde.foreach(qlTable.setSerializationLib) - - qlTable + override def createView(view: CatalogTable): Unit = withHiveState { + client.createTable(toHiveViewTable(view)) } - private def toViewTable(view: HiveTable): metadata.Table = { - // TODO: this is duplicated with `toQlTable` except the table type stuff. - val tbl = new metadata.Table(view.database, view.name) - tbl.setTableType(HTableType.VIRTUAL_VIEW) - tbl.setSerializationLib(null) - tbl.clearSerDeInfo() - - // TODO: we will save the same SQL string to original and expanded text, which is different - // from Hive. - tbl.setViewOriginalText(view.viewText.get) - tbl.setViewExpandedText(view.viewText.get) - - tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) - view.properties.foreach { case (k, v) => tbl.setProperty(k, v) } - - // set owner - tbl.setOwner(conf.getUser) - // set create time - tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) - - tbl + override def alertView(view: CatalogTable): Unit = withHiveState { + client.alterTable(view.qualifiedName, toHiveViewTable(view)) } - override def createView(view: HiveTable): Unit = withHiveState { - client.createTable(toViewTable(view)) + override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { + client.createTable(toHiveTable(table), ignoreIfExists) } - override def alertView(view: HiveTable): Unit = withHiveState { - client.alterTable(view.qualifiedName, toViewTable(view)) + override def dropTable( + dbName: String, + tableName: String, + ignoreIfNotExists: Boolean): Unit = withHiveState { + client.dropTable(dbName, tableName, true, ignoreIfNotExists) } - override def createTable(table: HiveTable): Unit = withHiveState { - val qlTable = toQlTable(table) - client.createTable(qlTable) + override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { + val hiveTable = toHiveTable(table) + // Do not use `table.qualifiedName` here because this may be a rename + val qualifiedTableName = s"${table.database}.$tableName" + client.alterTable(qualifiedTableName, hiveTable) } - override def alterTable(table: HiveTable): Unit = withHiveState { - val qlTable = toQlTable(table) - client.alterTable(table.qualifiedName, qlTable) + override def createPartitions( + db: String, + table: String, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = withHiveState { + val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) + parts.foreach { s => + addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + } + client.createPartitions(addPartitionDesc) + } + + override def dropPartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { + // TODO: figure out how to drop multiple partitions in one call + specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) } + } + + override def renamePartitions( + db: String, + table: String, + specs: Seq[Catalog.TablePartitionSpec], + newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState { + require(specs.size == newSpecs.size, "number of old and new partition specs differ") + val catalogTable = getTable(db, table) + val hiveTable = toHiveTable(catalogTable) + specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => + val hivePart = getPartitionOption(catalogTable, oldSpec) + .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } + .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) } + client.renamePartition(hiveTable, oldSpec.asJava, hivePart) + } } - private def toHivePartition(partition: metadata.Partition): HivePartition = { - val apiPartition = partition.getTPartition - HivePartition( - values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), - storage = HiveStorageDescriptor( - location = apiPartition.getSd.getLocation, - inputFormat = apiPartition.getSd.getInputFormat, - outputFormat = apiPartition.getSd.getOutputFormat, - serde = apiPartition.getSd.getSerdeInfo.getSerializationLib, - serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + override def alterPartitions( + db: String, + table: String, + newParts: Seq[CatalogTablePartition]): Unit = withHiveState { + val hiveTable = toHiveTable(getTable(db, table)) + client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } override def getPartitionOption( - table: HiveTable, - partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState { - - val qlTable = toQlTable(table) - val qlPartition = client.getPartition(qlTable, partitionSpec, false) - Option(qlPartition).map(toHivePartition) + table: CatalogTable, + spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { + val hiveTable = toHiveTable(table) + val hivePartition = client.getPartition(hiveTable, spec.asJava, false) + Option(hivePartition).map(fromHivePartition) } - override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState { - val qlTable = toQlTable(hTable) - shim.getAllPartitions(client, qlTable).map(toHivePartition) + override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState { + val hiveTable = toHiveTable(table) + shim.getAllPartitions(client, hiveTable).map(fromHivePartition) } override def getPartitionsByFilter( - hTable: HiveTable, - predicates: Seq[Expression]): Seq[HivePartition] = withHiveState { - val qlTable = toQlTable(hTable) - shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition) + table: CatalogTable, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { + val hiveTable = toHiveTable(table) + shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) } override def listTables(dbName: String): Seq[String] = withHiveState { client.getAllTables(dbName).asScala } + override def listTables(dbName: String, pattern: String): Seq[String] = withHiveState { + client.getTablesByPattern(dbName, pattern).asScala + } + /** * Runs the specified SQL query using Hive. */ @@ -508,6 +526,34 @@ private[hive] class HiveClientImpl( listBucketingEnabled) } + override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState { + client.createFunction(toHiveFunction(func, db)) + } + + override def dropFunction(db: String, name: String): Unit = withHiveState { + client.dropFunction(db, name) + } + + override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState { + val catalogFunc = getFunction(db, oldName).copy(name = newName) + val hiveFunc = toHiveFunction(catalogFunc, db) + client.alterFunction(db, oldName, hiveFunc) + } + + override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState { + client.alterFunction(db, func.name, toHiveFunction(func, db)) + } + + override def getFunctionOption( + db: String, + name: String): Option[CatalogFunction] = withHiveState { + Option(client.getFunction(db, name)).map(fromHiveFunction) + } + + override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState { + client.getFunctions(db, pattern).asScala + } + def addJar(path: String): Unit = { val uri = new Path(path).toUri val jarURL = if (uri.getScheme == null) { @@ -541,4 +587,97 @@ private[hive] class HiveClientImpl( client.dropDatabase(db, true, false, true) } } + + + /* -------------------------------------------------------- * + | Helper methods for converting to and from Hive classes | + * -------------------------------------------------------- */ + + private def toInputFormat(name: String) = + Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] + + private def toOutputFormat(name: String) = + Utils.classForName(name) + .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] + + private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { + new HiveFunction( + f.name, + db, + f.className, + null, + PrincipalType.USER, + (System.currentTimeMillis / 1000).toInt, + FunctionType.JAVA, + List.empty[ResourceUri].asJava) + } + + private def fromHiveFunction(hf: HiveFunction): CatalogFunction = { + new CatalogFunction(hf.getFunctionName, hf.getClassName) + } + + private def toHiveColumn(c: CatalogColumn): FieldSchema = { + new FieldSchema(c.name, c.dataType, c.comment.orNull) + } + + private def fromHiveColumn(hc: FieldSchema): CatalogColumn = { + new CatalogColumn( + name = hc.getName, + dataType = hc.getType, + nullable = true, + comment = Option(hc.getComment)) + } + + private def toHiveTable(table: CatalogTable): HiveTable = { + val hiveTable = new HiveTable(table.database, table.name) + hiveTable.setTableType(table.tableType match { + case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE + case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE + case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE + case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW + }) + hiveTable.setFields(table.schema.map(toHiveColumn).asJava) + hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava) + // TODO: set sort columns here too + hiveTable.setOwner(conf.getUser) + hiveTable.setNumBuckets(table.numBuckets) + hiveTable.setCreateTime((table.createTime / 1000).toInt) + hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) + table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) + table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) + table.storage.serde.foreach(hiveTable.setSerializationLib) + table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) } + table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) } + table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) } + table.viewText.foreach { t => hiveTable.setViewExpandedText(t) } + hiveTable + } + + private def toHiveViewTable(view: CatalogTable): HiveTable = { + val tbl = toHiveTable(view) + tbl.setTableType(HiveTableType.VIRTUAL_VIEW) + tbl.setSerializationLib(null) + tbl.clearSerDeInfo() + tbl + } + + private def toHivePartition( + p: CatalogTablePartition, + ht: HiveTable): HivePartition = { + new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new Path(l) }.orNull) + } + + private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + val apiPartition = hp.getTPartition + CatalogTablePartition( + spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), + storage = CatalogStorageFormat( + locationUri = Option(apiPartition.getSd.getLocation), + inputFormat = Option(apiPartition.getSd.getInputFormat), + outputFormat = Option(apiPartition.getSd.getOutputFormat), + serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), + serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 4c0aae6c04..3f81c99c41 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation} -import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} /** * Create table and insert the query result into it. @@ -33,7 +33,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} */ private[hive] case class CreateTableAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, query: LogicalPlan, allowExisting: Boolean) extends RunnableCommand { @@ -51,25 +51,25 @@ case class CreateTableAsSelect( import org.apache.hadoop.mapred.TextInputFormat val withFormat = - tableDesc.copy( + tableDesc.withNewStorage( inputFormat = - tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), + tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), outputFormat = - tableDesc.outputFormat + tableDesc.storage.outputFormat .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName()))) + serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName))) val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS // However we don't think SparkSQL should follow that. - tableDesc.copy(schema = - query.output.map(c => - HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null))) + tableDesc.copy(schema = query.output.map { c => + CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType)) + }) } else { withFormat } - hiveContext.catalog.client.createTable(withSchema) + hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation hiveContext.catalog.lookupRelation(tableIdentifier, None) match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 5da58a73e1..2914d03749 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -21,11 +21,11 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder} -import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} +import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder} /** * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of @@ -34,7 +34,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different // from Hive and may not work for some cases like create view on self join. private[hive] case class CreateViewAsSelect( - tableDesc: HiveTable, + tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean, orReplace: Boolean) extends RunnableCommand { @@ -72,7 +72,7 @@ private[hive] case class CreateViewAsSelect( Seq.empty[Row] } - private def prepareTable(sqlContext: SQLContext): HiveTable = { + private def prepareTable(sqlContext: SQLContext): CatalogTable = { val expandedText = if (sqlContext.conf.canonicalView) { try rebuildViewQueryString(sqlContext) catch { case NonFatal(e) => wrapViewTextWithSelect @@ -83,12 +83,16 @@ private[hive] case class CreateViewAsSelect( val viewSchema = { if (tableDesc.schema.isEmpty) { - childSchema.map { attr => - HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null) + childSchema.map { a => + CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType)) } } else { - childSchema.zip(tableDesc.schema).map { case (attr, col) => - HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment) + childSchema.zip(tableDesc.schema).map { case (a, col) => + CatalogColumn( + col.name, + HiveMetastoreTypes.toMetastoreType(a.dataType), + nullable = true, + col.comment) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index feb133d448..d316664241 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -205,7 +205,7 @@ case class InsertIntoHiveTable( val oldPart = catalog.client.getPartitionOption( catalog.client.getTable(table.databaseName, table.tableName), - partitionSpec.asJava) + partitionSpec) if (oldPart.isEmpty || !ifNotExists) { catalog.client.loadPartition( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala new file mode 100644 index 0000000000..f73e7e2351 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.util.VersionInfo + +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} +import org.apache.spark.util.Utils + + +/** + * Test suite for the [[HiveCatalog]]. + */ +class HiveCatalogSuite extends CatalogTestCases { + + private val client: HiveClient = { + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hadoopVersion = VersionInfo.getVersion).createClient() + } + + protected override val tableInputFormat: String = + "org.apache.hadoop.mapred.SequenceFileInputFormat" + protected override val tableOutputFormat: String = + "org.apache.hadoop.mapred.SequenceFileOutputFormat" + + protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath + + protected override def resetState(): Unit = client.reset() + + protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client) + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 14a83d5390..f8764d4725 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{QueryTest, Row, SaveMode, SQLConf} -import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} @@ -83,16 +83,16 @@ class DataSourceWithHiveMetastoreCatalogSuite } val hiveTable = catalog.client.getTable("default", "t") - assert(hiveTable.inputFormat === Some(inputFormat)) - assert(hiveTable.outputFormat === Some(outputFormat)) - assert(hiveTable.serde === Some(serde)) + assert(hiveTable.storage.inputFormat === Some(inputFormat)) + assert(hiveTable.storage.outputFormat === Some(outputFormat)) + assert(hiveTable.storage.serde === Some(serde)) - assert(!hiveTable.isPartitioned) - assert(hiveTable.tableType === ManagedTable) + assert(hiveTable.partitionColumns.isEmpty) + assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string")) + assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) checkAnswer(table("t"), testDF) assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) @@ -114,16 +114,17 @@ class DataSourceWithHiveMetastoreCatalogSuite } val hiveTable = catalog.client.getTable("default", "t") - assert(hiveTable.inputFormat === Some(inputFormat)) - assert(hiveTable.outputFormat === Some(outputFormat)) - assert(hiveTable.serde === Some(serde)) + assert(hiveTable.storage.inputFormat === Some(inputFormat)) + assert(hiveTable.storage.outputFormat === Some(outputFormat)) + assert(hiveTable.storage.serde === Some(serde)) - assert(hiveTable.tableType === ExternalTable) - assert(hiveTable.location.get === path.toURI.toString.stripSuffix(File.separator)) + assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE) + assert(hiveTable.storage.locationUri === + Some(path.toURI.toString.stripSuffix(File.separator))) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string")) + assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) checkAnswer(table("t"), testDF) assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) @@ -143,17 +144,16 @@ class DataSourceWithHiveMetastoreCatalogSuite """.stripMargin) val hiveTable = catalog.client.getTable("default", "t") - assert(hiveTable.inputFormat === Some(inputFormat)) - assert(hiveTable.outputFormat === Some(outputFormat)) - assert(hiveTable.serde === Some(serde)) + assert(hiveTable.storage.inputFormat === Some(inputFormat)) + assert(hiveTable.storage.outputFormat === Some(outputFormat)) + assert(hiveTable.storage.serde === Some(serde)) - assert(hiveTable.isPartitioned === false) - assert(hiveTable.tableType === ExternalTable) - assert(hiveTable.partitionColumns.length === 0) + assert(hiveTable.partitionColumns.isEmpty) + assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) - assert(columns.map(_.hiveType) === Seq("int", "string")) + assert(columns.map(_.dataType) === Seq("int", "string")) checkAnswer(table("t"), Row(1, "val_1")) assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 137dadd6c6..e869c0e2bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -22,15 +22,15 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.SimpleParserConf import org.apache.spark.sql.catalyst.plans.logical.Generate -import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, HiveTable, ManagedTable} class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val parser = new HiveQl(SimpleParserConf()) - private def extractTableDesc(sql: String): (HiveTable, Boolean) = { + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting) }.head @@ -53,28 +53,29 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { |AS SELECT * FROM src""".stripMargin val (desc, exists) = extractTableDesc(s1) - assert(exists == true) + assert(exists) assert(desc.specifiedDatabase == Some("mydb")) assert(desc.name == "page_view") - assert(desc.tableType == ExternalTable) - assert(desc.location == Some("/user/external/page_view")) + assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) + assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == - HiveColumn("viewtime", "int", null) :: - HiveColumn("userid", "bigint", null) :: - HiveColumn("page_url", "string", null) :: - HiveColumn("referrer_url", "string", null) :: - HiveColumn("ip", "string", "IP Address of the User") :: - HiveColumn("country", "string", "country of origination") :: Nil) + CatalogColumn("viewtime", "int") :: + CatalogColumn("userid", "bigint") :: + CatalogColumn("page_url", "string") :: + CatalogColumn("referrer_url", "string") :: + CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: + CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.viewText == Option("This is the staging page view table")) assert(desc.partitionColumns == - HiveColumn("dt", "string", "date type") :: - HiveColumn("hour", "string", "hour of the day") :: Nil) - assert(desc.serdeProperties == + CatalogColumn("dt", "string", comment = Some("date type")) :: + CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.storage.serdeProperties == Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054"))) - assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.serde == + Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -98,27 +99,27 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { |AS SELECT * FROM src""".stripMargin val (desc, exists) = extractTableDesc(s2) - assert(exists == true) + assert(exists) assert(desc.specifiedDatabase == Some("mydb")) assert(desc.name == "page_view") - assert(desc.tableType == ExternalTable) - assert(desc.location == Some("/user/external/page_view")) + assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) + assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == - HiveColumn("viewtime", "int", null) :: - HiveColumn("userid", "bigint", null) :: - HiveColumn("page_url", "string", null) :: - HiveColumn("referrer_url", "string", null) :: - HiveColumn("ip", "string", "IP Address of the User") :: - HiveColumn("country", "string", "country of origination") :: Nil) + CatalogColumn("viewtime", "int") :: + CatalogColumn("userid", "bigint") :: + CatalogColumn("page_url", "string") :: + CatalogColumn("referrer_url", "string") :: + CatalogColumn("ip", "string", comment = Some("IP Address of the User")) :: + CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil) // TODO will be SQLText assert(desc.viewText == Option("This is the staging page view table")) assert(desc.partitionColumns == - HiveColumn("dt", "string", "date type") :: - HiveColumn("hour", "string", "hour of the day") :: Nil) - assert(desc.serdeProperties == Map()) - assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat")) - assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat")) - assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe")) + CatalogColumn("dt", "string", comment = Some("date type")) :: + CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil) + assert(desc.storage.serdeProperties == Map()) + assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) + assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) + assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) } @@ -128,14 +129,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { assert(exists == false) assert(desc.specifiedDatabase == None) assert(desc.name == "page_view") - assert(desc.tableType == ManagedTable) - assert(desc.location == None) - assert(desc.schema == Seq.empty[HiveColumn]) + assert(desc.tableType == CatalogTableType.MANAGED_TABLE) + assert(desc.storage.locationUri == None) + assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText - assert(desc.serdeProperties == Map()) - assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - assert(desc.serde.isEmpty) + assert(desc.storage.serdeProperties == Map()) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + assert(desc.storage.serde.isEmpty) assert(desc.properties == Map()) } @@ -162,14 +164,14 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { assert(exists == false) assert(desc.specifiedDatabase == None) assert(desc.name == "ctas2") - assert(desc.tableType == ManagedTable) - assert(desc.location == None) - assert(desc.schema == Seq.empty[HiveColumn]) + assert(desc.tableType == CatalogTableType.MANAGED_TABLE) + assert(desc.storage.locationUri == None) + assert(desc.schema == Seq.empty[CatalogColumn]) assert(desc.viewText == None) // TODO will be SQLText - assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) - assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) + assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d9e4b020fd..0c288bdf8a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation -import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -724,20 +724,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val tableName = "spark6655" withTable(tableName) { val schema = StructType(StructField("int", IntegerType, true) :: Nil) - val hiveTable = HiveTable( + val hiveTable = CatalogTable( specifiedDatabase = Some("default"), name = tableName, + tableType = CatalogTableType.MANAGED_TABLE, schema = Seq.empty, - partitionColumns = Seq.empty, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map( + "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) + ), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema" -> schema.json, - "EXTERNAL" -> "FALSE"), - tableType = ManagedTable, - serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))) + "EXTERNAL" -> "FALSE")) - catalog.client.createTable(hiveTable) + catalog.client.createTable(hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -916,7 +921,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, we verify that // each column of the table is of native type StringType. assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == StringType)) + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) catalog.createDataSourceTable( tableIdent = TableIdentifier("skip_hive_metadata"), @@ -930,6 +935,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, we verify that // the table has a column type as array of StringType. assert(catalog.client.getTable("default", "skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == ArrayType(StringType))) + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index c2c896e5f6..488f298981 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -26,9 +26,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private def checkTablePath(dbName: String, tableName: String): Unit = { val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) - val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName + val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName - assert(metastoreTable.serdeProperties("path") === expectedPath) + assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } test(s"saveAsTable() to non-default database - with USE - Overwrite") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 1344a2cc4b..d850d522be 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkFunSuite} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveContext @@ -60,8 +61,8 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopVersion = VersionInfo.getVersion, config = buildConf(), ivyPath = ivyPath).createClient() - val db = new HiveDatabase("default", "") - badClient.createDatabase(db) + val db = new CatalogDatabase("default", "desc", "loc", Map()) + badClient.createDatabase(db, ignoreIfExists = true) } private def getNestedMessages(e: Throwable): String = { @@ -116,29 +117,27 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: createDatabase") { - val db = HiveDatabase("default", "") - client.createDatabase(db) + val db = CatalogDatabase("default", "desc", "loc", Map()) + client.createDatabase(db, ignoreIfExists = true) } test(s"$version: createTable") { val table = - HiveTable( + CatalogTable( specifiedDatabase = Option("default"), name = "src", - schema = Seq(HiveColumn("key", "int", "")), - partitionColumns = Seq.empty, - properties = Map.empty, - serdeProperties = Map.empty, - tableType = ManagedTable, - location = None, - inputFormat = - Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName), - outputFormat = - Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), - serde = - Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName())) - - client.createTable(table) + tableType = CatalogTableType.MANAGED_TABLE, + schema = Seq(CatalogColumn("key", "int")), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName), + outputFormat = Some( + classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()), + serdeProperties = Map.empty + )) + + client.createTable(table, ignoreIfExists = false) } test(s"$version: getTable") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index b91248bfb3..37c01792d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) - val partValues = if (relation.table.isPartitioned) { + val partValues = if (relation.table.partitionColumns.nonEmpty) { p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) } else { Seq.empty |