From 8e1bb0456db1ad60afa24aa033b574c4a79b9c09 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 11:54:10 -0700 Subject: [SPARK-14801][SQL] Move MetastoreRelation to its own file ## What changes were proposed in this pull request? This class is currently in HiveMetastoreCatalog.scala, which is a large file that makes refactoring and searching of usage difficult. Moving it out so I can then do SPARK-14799 and make the review of that simpler. ## How was this patch tested? N/A - this is a straightforward move and should be covered by existing tests. Author: Reynold Xin Closes #12567 from rxin/SPARK-14801. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 206 +----------------- .../apache/spark/sql/hive/MetastoreRelation.scala | 231 +++++++++++++++++++++ 2 files changed, 232 insertions(+), 205 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3eea6c06ac..8732285dac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,30 +20,21 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import scala.collection.mutable -import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} -import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} -import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} import org.apache.spark.sql.internal.HiveSerDe @@ -755,7 +746,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { * An override of the standard HDFS listing based catalog, that overrides the partition spec with * the information from the metastore. */ -class MetaStoreFileCatalog( +private[hive] class MetaStoreFileCatalog( ctx: SQLContext, paths: Seq[Path], partitionSpecFromHive: PartitionSpec) @@ -796,201 +787,6 @@ private[hive] case class InsertIntoHiveTable( } } -private[hive] case class MetastoreRelation( - databaseName: String, - tableName: String, - alias: Option[String]) - (val table: CatalogTable, - @transient private val client: HiveClient, - @transient private val sqlContext: SQLContext) - extends LeafNode with MultiInstanceRelation with FileRelation { - - override def equals(other: Any): Boolean = other match { - case relation: MetastoreRelation => - databaseName == relation.databaseName && - tableName == relation.tableName && - alias == relation.alias && - output == relation.output - case _ => false - } - - override def hashCode(): Int = { - Objects.hashCode(databaseName, tableName, alias, output) - } - - override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil - - private def toHiveColumn(c: CatalogColumn): FieldSchema = { - new FieldSchema(c.name, c.dataType, c.comment.orNull) - } - - // TODO: merge this with HiveClientImpl#toHiveTable - @transient val hiveQlTable: HiveTable = { - // We start by constructing an API table as Hive performs several important transformations - // internally when converting an API table to a QL table. - val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(table.identifier.table) - tTable.setDbName(table.database) - - val tableParameters = new java.util.HashMap[String, String]() - tTable.setParameters(tableParameters) - table.properties.foreach { case (k, v) => tableParameters.put(k, v) } - - tTable.setTableType(table.tableType match { - case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString - case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString - case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString - case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString - }) - - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tTable.setSd(sd) - - // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => - table.partitionColumnNames.contains(c.getName) - } - sd.setCols(schema.asJava) - tTable.setPartitionKeys(partCols.asJava) - - table.storage.locationUri.foreach(sd.setLocation) - table.storage.inputFormat.foreach(sd.setInputFormat) - table.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - table.storage.serde.foreach(serdeInfo.setSerializationLib) - sd.setSerdeInfo(serdeInfo) - - val serdeParameters = new java.util.HashMap[String, String]() - table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - - new HiveTable(tTable) - } - - @transient override lazy val statistics: Statistics = Statistics( - sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. An - // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot - // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, - // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. - BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // if the size is still less than zero, we use default size - Option(totalSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes))) - } - ) - - // When metastore partition pruning is turned off, we cache the list of all partitions to - // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table) - - def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { - val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { - client.getPartitionsByFilter(table, predicates) - } else { - allPartitions - } - - rawPartitions.map { p => - val tPartition = new org.apache.hadoop.hive.metastore.api.Partition - tPartition.setDbName(databaseName) - tPartition.setTableName(tableName) - tPartition.setValues(p.spec.values.toList.asJava) - - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tPartition.setSd(sd) - sd.setCols(table.schema.map(toHiveColumn).asJava) - p.storage.locationUri.foreach(sd.setLocation) - p.storage.inputFormat.foreach(sd.setInputFormat) - p.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - sd.setSerdeInfo(serdeInfo) - // maps and lists should be set only after all elements are ready (see HIVE-7975) - p.storage.serde.foreach(serdeInfo.setSerializationLib) - - val serdeParameters = new java.util.HashMap[String, String]() - table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - - new Partition(hiveQlTable, tPartition) - } - } - - /** Only compare database and tablename, not alias. */ - override def sameResult(plan: LogicalPlan): Boolean = { - plan match { - case mr: MetastoreRelation => - mr.databaseName == databaseName && mr.tableName == tableName - case _ => false - } - } - - val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata - ) - - implicit class SchemaAttribute(f: CatalogColumn) { - def toAttribute: AttributeReference = AttributeReference( - f.name, - HiveMetastoreTypes.toDataType(f.dataType), - // Since data can be dumped in randomly with no validation, everything is nullable. - nullable = true - )(qualifier = Some(alias.getOrElse(tableName))) - } - - /** PartitionKey attributes */ - val partitionKeys = table.partitionColumns.map(_.toAttribute) - - /** Non-partitionKey attributes */ - // TODO: just make this hold the schema itself, not just non-partition columns - val attributes = table.schema - .filter { c => !table.partitionColumnNames.contains(c.name) } - .map(_.toAttribute) - - val output = attributes ++ partitionKeys - - /** An attribute map that can be used to lookup original attributes based on expression id. */ - val attributeMap = AttributeMap(output.map(o => (o, o))) - - /** An attribute map for determining the ordinal for non-partition columns. */ - val columnOrdinals = AttributeMap(attributes.zipWithIndex) - - override def inputFiles: Array[String] = { - val partLocations = client - .getPartitionsByFilter(table, Nil) - .flatMap(_.storage.locationUri) - .toArray - if (partLocations.nonEmpty) { - partLocations - } else { - Array( - table.storage.locationUri.getOrElse( - sys.error(s"Could not get the location of ${table.qualifiedName}."))) - } - } - - - override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext) - } -} - private[hive] object HiveMetastoreTypes { def toDataType(metastoreType: String): DataType = DataTypeParser.parse(metastoreType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala new file mode 100644 index 0000000000..a66c325b8f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import com.google.common.base.Objects +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.parser.DataTypeParser +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.hive.client.HiveClient + + +private[hive] case class MetastoreRelation( + databaseName: String, + tableName: String, + alias: Option[String]) + (val table: CatalogTable, + @transient private val client: HiveClient, + @transient private val sqlContext: SQLContext) + extends LeafNode with MultiInstanceRelation with FileRelation { + + override def equals(other: Any): Boolean = other match { + case relation: MetastoreRelation => + databaseName == relation.databaseName && + tableName == relation.tableName && + alias == relation.alias && + output == relation.output + case _ => false + } + + override def hashCode(): Int = { + Objects.hashCode(databaseName, tableName, alias, output) + } + + override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil + + private def toHiveColumn(c: CatalogColumn): FieldSchema = { + new FieldSchema(c.name, c.dataType, c.comment.orNull) + } + + // TODO: merge this with HiveClientImpl#toHiveTable + @transient val hiveQlTable: HiveTable = { + // We start by constructing an API table as Hive performs several important transformations + // internally when converting an API table to a QL table. + val tTable = new org.apache.hadoop.hive.metastore.api.Table() + tTable.setTableName(table.identifier.table) + tTable.setDbName(table.database) + + val tableParameters = new java.util.HashMap[String, String]() + tTable.setParameters(tableParameters) + table.properties.foreach { case (k, v) => tableParameters.put(k, v) } + + tTable.setTableType(table.tableType match { + case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString + case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString + case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString + case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString + }) + + val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + tTable.setSd(sd) + + // Note: In Hive the schema and partition columns must be disjoint sets + val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => + table.partitionColumnNames.contains(c.getName) + } + sd.setCols(schema.asJava) + tTable.setPartitionKeys(partCols.asJava) + + table.storage.locationUri.foreach(sd.setLocation) + table.storage.inputFormat.foreach(sd.setInputFormat) + table.storage.outputFormat.foreach(sd.setOutputFormat) + + val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo + table.storage.serde.foreach(serdeInfo.setSerializationLib) + sd.setSerdeInfo(serdeInfo) + + val serdeParameters = new java.util.HashMap[String, String]() + table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + serdeInfo.setParameters(serdeParameters) + + new HiveTable(tTable) + } + + @transient override lazy val statistics: Statistics = Statistics( + sizeInBytes = { + val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) + val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) + // TODO: check if this estimate is valid for tables after partition pruning. + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. An + // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot + // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, + // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. + BigInt( + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead + // if the size is still less than zero, we use default size + Option(totalSize).map(_.toLong).filter(_ > 0) + .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) + .getOrElse(sqlContext.conf.defaultSizeInBytes))) + } + ) + + // When metastore partition pruning is turned off, we cache the list of all partitions to + // mimic the behavior of Spark < 1.5 + private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table) + + def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { + val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { + client.getPartitionsByFilter(table, predicates) + } else { + allPartitions + } + + rawPartitions.map { p => + val tPartition = new org.apache.hadoop.hive.metastore.api.Partition + tPartition.setDbName(databaseName) + tPartition.setTableName(tableName) + tPartition.setValues(p.spec.values.toList.asJava) + + val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + tPartition.setSd(sd) + sd.setCols(table.schema.map(toHiveColumn).asJava) + p.storage.locationUri.foreach(sd.setLocation) + p.storage.inputFormat.foreach(sd.setInputFormat) + p.storage.outputFormat.foreach(sd.setOutputFormat) + + val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo + sd.setSerdeInfo(serdeInfo) + // maps and lists should be set only after all elements are ready (see HIVE-7975) + p.storage.serde.foreach(serdeInfo.setSerializationLib) + + val serdeParameters = new java.util.HashMap[String, String]() + table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + serdeInfo.setParameters(serdeParameters) + + new Partition(hiveQlTable, tPartition) + } + } + + /** Only compare database and tablename, not alias. */ + override def sameResult(plan: LogicalPlan): Boolean = { + plan match { + case mr: MetastoreRelation => + mr.databaseName == databaseName && mr.tableName == tableName + case _ => false + } + } + + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because + // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to + // substitute some output formats, e.g. substituting SequenceFileOutputFormat to + // HiveSequenceFileOutputFormat. + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + + implicit class SchemaAttribute(f: CatalogColumn) { + def toAttribute: AttributeReference = AttributeReference( + f.name, + DataTypeParser.parse(f.dataType), + // Since data can be dumped in randomly with no validation, everything is nullable. + nullable = true + )(qualifier = Some(alias.getOrElse(tableName))) + } + + /** PartitionKey attributes */ + val partitionKeys = table.partitionColumns.map(_.toAttribute) + + /** Non-partitionKey attributes */ + // TODO: just make this hold the schema itself, not just non-partition columns + val attributes = table.schema + .filter { c => !table.partitionColumnNames.contains(c.name) } + .map(_.toAttribute) + + val output = attributes ++ partitionKeys + + /** An attribute map that can be used to lookup original attributes based on expression id. */ + val attributeMap = AttributeMap(output.map(o => (o, o))) + + /** An attribute map for determining the ordinal for non-partition columns. */ + val columnOrdinals = AttributeMap(attributes.zipWithIndex) + + override def inputFiles: Array[String] = { + val partLocations = client + .getPartitionsByFilter(table, Nil) + .flatMap(_.storage.locationUri) + .toArray + if (partLocations.nonEmpty) { + partLocations + } else { + Array( + table.storage.locationUri.getOrElse( + sys.error(s"Could not get the location of ${table.qualifiedName}."))) + } + } + + override def newInstance(): MetastoreRelation = { + MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext) + } +} -- cgit v1.2.3