diff options
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 310 |
1 files changed, 220 insertions, 90 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7066d7363..ccc8345d73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,7 +25,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse} +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} import org.apache.hadoop.hive.ql.plan.TableDesc @@ -40,12 +40,13 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.{datasources, FileRelation} +import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation} +import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand -import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog} +import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} +import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, HDFSFileCatalog} import org.apache.spark.sql.types._ private[hive] case class HiveSerDe( @@ -85,7 +86,18 @@ private[hive] object HiveSerDe { HiveSerDe( inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))) + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), + + "textfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + + "avro" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) val key = source.toLowerCase match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" @@ -102,7 +114,7 @@ private[hive] object HiveSerDe { * Legacy catalog for interacting with the Hive metastore. * * This is still used for things like creating data source tables, but in the future will be - * cleaned up to integrate more nicely with [[HiveCatalog]]. + * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. */ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) extends Logging { @@ -124,8 +136,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - t.name.database.getOrElse(getCurrentDatabase).toLowerCase, - t.name.table.toLowerCase) + t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase, + t.identifier.table.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -299,7 +311,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte def newSparkSQLSpecificMetastoreTable(): CatalogTable = { CatalogTable( - name = TableIdentifier(tblName, Option(dbName)), + identifier = TableIdentifier(tblName, Option(dbName)), tableType = tableType, schema = Nil, storage = CatalogStorageFormat( @@ -319,7 +331,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte assert(relation.partitionSchema.isEmpty) CatalogTable( - name = TableIdentifier(tblName, Option(dbName)), + identifier = TableIdentifier(tblName, Option(dbName)), tableType = tableType, storage = CatalogStorageFormat( locationUri = Some(relation.location.paths.map(_.toUri.toString).head), @@ -431,7 +443,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte alias match { // because hive use things like `_c0` to build the expanded text // currently we cannot support view from "create view v1(c1) as ..." - case None => SubqueryAlias(table.name.table, hive.parseSql(viewText)) + case None => SubqueryAlias(table.identifier.table, hive.parseSql(viewText)) case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText)) } } else { @@ -440,58 +452,72 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } - private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) - val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging - - val parquetOptions = Map( - ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, - ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( - metastoreRelation.tableName, - Some(metastoreRelation.databaseName) - ).unquotedString - ) - val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - - def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - schemaInMetastore: StructType, - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { - cachedDataSourceTables.getIfPresent(tableIdentifier) match { - case null => None // Cache miss - case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => - // If we have the same paths, same schema, and same partition spec, - // we will use the cached Parquet Relation. - val useCached = - parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && - logical.schema.sameType(metastoreSchema) && - parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse { - PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory]) + private def getCached( + tableIdentifier: QualifiedTableName, + metastoreRelation: MetastoreRelation, + schemaInMetastore: StructType, + expectedFileFormat: Class[_ <: FileFormat], + expectedBucketSpec: Option[BucketSpec], + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + + cachedDataSourceTables.getIfPresent(tableIdentifier) match { + case null => None // Cache miss + case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => + val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq + val cachedRelationFileFormatClass = relation.fileFormat.getClass + + expectedFileFormat match { + case `cachedRelationFileFormatClass` => + // If we have the same paths, same schema, and same partition spec, + // we will use the cached relation. + val useCached = + relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && + logical.schema.sameType(schemaInMetastore) && + relation.bucketSpec == expectedBucketSpec && + relation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) + } + + if (useCached) { + Some(logical) + } else { + // If the cached relation is not updated, we invalidate it right away. + cachedDataSourceTables.invalidate(tableIdentifier) + None } - - if (useCached) { - Some(logical) - } else { - // If the cached relation is not updated, we invalidate it right away. + case _ => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + + s"should be stored as $expectedFileFormat. However, we are getting " + + s"a ${relation.fileFormat} from the metastore cache. This cached " + + s"entry will be invalidated.") cachedDataSourceTables.invalidate(tableIdentifier) None - } - case other => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as Parquet. However, we are getting a $other from the metastore cache. " + - s"This cached entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) - None - } + } + case other => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + + s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + + s"This cached entry will be invalidated.") + cachedDataSourceTables.invalidate(tableIdentifier) + None } + } + + private def convertToLogicalRelation(metastoreRelation: MetastoreRelation, + options: Map[String, String], + defaultSource: FileFormat, + fileFormatClass: Class[_ <: FileFormat], + fileType: String): LogicalRelation = { + val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val tableIdentifier = + QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into ParquetRelation, so predicates to Hive metastore + // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore // are empty. val partitions = metastoreRelation.getHiveQlPartitions().map { p => val location = p.getLocation @@ -504,54 +530,65 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val cached = getCached( tableIdentifier, - metastoreRelation.table.storage.locationUri.toSeq, + metastoreRelation, metastoreSchema, + fileFormatClass, + bucketSpec, Some(partitionSpec)) - val parquetRelation = cached.getOrElse { + val hadoopFsRelation = cached.getOrElse { val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) - val format = new DefaultSource() - val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles()) - val mergedSchema = inferredSchema.map { inferred => - ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) + val inferredSchema = if (fileType.equals("parquet")) { + val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + } else { + defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get + } val relation = HadoopFsRelation( sqlContext = hive, location = fileCatalog, partitionSchema = partitionSchema, - dataSchema = mergedSchema, - bucketSpec = None, // We don't support hive bucketed tables, only ones we write out. - fileFormat = new DefaultSource(), - options = parquetOptions) + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options) val created = LogicalRelation(relation) cachedDataSourceTables.put(tableIdentifier, created) created } - parquetRelation + hadoopFsRelation } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - val cached = getCached(tableIdentifier, paths, metastoreSchema, None) - val parquetRelation = cached.getOrElse { + val cached = getCached(tableIdentifier, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + None) + val logicalRelation = cached.getOrElse { val created = LogicalRelation( DataSource( sqlContext = hive, paths = paths, userSpecifiedSchema = Some(metastoreRelation.schema), - options = parquetOptions, - className = "parquet").resolveRelation()) + bucketSpec = bucketSpec, + options = options, + className = fileType).resolveRelation()) cachedDataSourceTables.put(tableIdentifier, created) created } - parquetRelation + logicalRelation } result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } @@ -561,6 +598,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte * data source relations for better performance. */ object ParquetConversions extends Rule[LogicalPlan] { + private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && + hive.convertMetastoreParquet + } + + private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new ParquetDefaultSource() + val fileFormatClass = classOf[ParquetDefaultSource] + + val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging + val options = Map( + ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, + ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( + relation.tableName, + Some(relation.databaseName) + ).unquotedString + ) + + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet") + } + override def apply(plan: LogicalPlan): LogicalPlan = { if (!plan.resolved || plan.analyzed) { return plan @@ -570,22 +628,17 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // Write path case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && - r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) // Write path case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && - r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation) } @@ -593,6 +646,50 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } /** + * When scanning Metastore ORC tables, convert them to ORC data source relations + * for better performance. + */ + object OrcConversions extends Rule[LogicalPlan] { + private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && + hive.convertMetastoreOrc + } + + private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new OrcDefaultSource() + val fileFormatClass = classOf[OrcDefaultSource] + val options = Map[String, String]() + + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.resolved || plan.analyzed) { + return plan + } + + plan transformUp { + // Write path + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + + // Write path + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + + // Read path + case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => + val orcRelation = convertToOrcRelation(relation) + SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation) + } + } + } + + /** * Creates any tables required for query execution. * For example, because of a CREATE TABLE X AS statement. */ @@ -611,7 +708,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateViewAsSelect( - table.copy(name = TableIdentifier(tblName, Some(dbName))), + table.copy(identifier = TableIdentifier(tblName, Some(dbName))), child, allowExisting, replace) @@ -633,7 +730,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte if (hive.convertCTAS && table.storage.serde.isEmpty) { // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). - if (table.name.database.isDefined) { + if (table.identifier.database.isDefined) { throw new AnalysisException( "Cannot specify database name in a CTAS statement " + "when spark.sql.hive.convertCTAS is set to true.") @@ -641,7 +738,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( - TableIdentifier(desc.name.table), + TableIdentifier(desc.identifier.table), conf.defaultDataSourceName, temporary = false, Array.empty[String], @@ -662,7 +759,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateTableAsSelect( - desc.copy(name = TableIdentifier(tblName, Some(dbName))), + desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), child, allowExisting) } @@ -792,7 +889,7 @@ private[hive] case class MetastoreRelation( // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(table.name.table) + tTable.setTableName(table.identifier.table) tTable.setDbName(table.database) val tableParameters = new java.util.HashMap[String, String]() @@ -808,8 +905,13 @@ private[hive] case class MetastoreRelation( val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tTable.setSd(sd) - sd.setCols(table.schema.map(toHiveColumn).asJava) - tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava) + + // Note: In Hive the schema and partition columns must be disjoint sets + val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => + table.partitionColumnNames.contains(c.getName) + } + sd.setCols(schema.asJava) + tTable.setPartitionKeys(partCols.asJava) table.storage.locationUri.foreach(sd.setLocation) table.storage.inputFormat.foreach(sd.setInputFormat) @@ -916,7 +1018,10 @@ private[hive] case class MetastoreRelation( val partitionKeys = table.partitionColumns.map(_.toAttribute) /** Non-partitionKey attributes */ - val attributes = table.schema.map(_.toAttribute) + // TODO: just make this hold the schema itself, not just non-partition columns + val attributes = table.schema + .filter { c => !table.partitionColumnNames.contains(c.name) } + .map(_.toAttribute) val output = attributes ++ partitionKeys @@ -977,3 +1082,28 @@ private[hive] object HiveMetastoreTypes { case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) } } + +private[hive] case class CreateTableAsSelect( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean) extends UnaryNode with Command { + + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = + tableDesc.identifier.database.isDefined && + tableDesc.schema.nonEmpty && + tableDesc.storage.serde.isDefined && + tableDesc.storage.inputFormat.isDefined && + tableDesc.storage.outputFormat.isDefined && + childrenResolved +} + +private[hive] case class CreateViewAsSelect( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + sql: String) extends UnaryNode with Command { + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = false +} |