aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 11:54:10 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 11:54:10 -0700
commit8e1bb0456db1ad60afa24aa033b574c4a79b9c09 (patch)
tree39932a62e87443b394862f5127cdef256742e0f5
parente4904d870a0e705a3a7d370320e6f8a5f23d5944 (diff)
downloadspark-8e1bb0456db1ad60afa24aa033b574c4a79b9c09.tar.gz
spark-8e1bb0456db1ad60afa24aa033b574c4a79b9c09.tar.bz2
spark-8e1bb0456db1ad60afa24aa033b574c4a79b9c09.zip
[SPARK-14801][SQL] Move MetastoreRelation to its own file
## What changes were proposed in this pull request? This class is currently in HiveMetastoreCatalog.scala, which is a large file that makes refactoring and searching of usage difficult. Moving it out so I can then do SPARK-14799 and make the review of that simpler. ## How was this patch tested? N/A - this is a straightforward move and should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12567 from rxin/SPARK-14801.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala206
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala231
2 files changed, 232 insertions, 205 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 3eea6c06ac..8732285dac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,30 +20,21 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import scala.collection.mutable
-import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _}
-import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
import org.apache.spark.sql.internal.HiveSerDe
@@ -755,7 +746,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*/
-class MetaStoreFileCatalog(
+private[hive] class MetaStoreFileCatalog(
ctx: SQLContext,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
@@ -796,201 +787,6 @@ private[hive] case class InsertIntoHiveTable(
}
}
-private[hive] case class MetastoreRelation(
- databaseName: String,
- tableName: String,
- alias: Option[String])
- (val table: CatalogTable,
- @transient private val client: HiveClient,
- @transient private val sqlContext: SQLContext)
- extends LeafNode with MultiInstanceRelation with FileRelation {
-
- override def equals(other: Any): Boolean = other match {
- case relation: MetastoreRelation =>
- databaseName == relation.databaseName &&
- tableName == relation.tableName &&
- alias == relation.alias &&
- output == relation.output
- case _ => false
- }
-
- override def hashCode(): Int = {
- Objects.hashCode(databaseName, tableName, alias, output)
- }
-
- override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
-
- private def toHiveColumn(c: CatalogColumn): FieldSchema = {
- new FieldSchema(c.name, c.dataType, c.comment.orNull)
- }
-
- // TODO: merge this with HiveClientImpl#toHiveTable
- @transient val hiveQlTable: HiveTable = {
- // We start by constructing an API table as Hive performs several important transformations
- // internally when converting an API table to a QL table.
- val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(table.identifier.table)
- tTable.setDbName(table.database)
-
- val tableParameters = new java.util.HashMap[String, String]()
- tTable.setParameters(tableParameters)
- table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
-
- tTable.setTableType(table.tableType match {
- case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
- case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
- case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
- case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
- })
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tTable.setSd(sd)
-
- // Note: In Hive the schema and partition columns must be disjoint sets
- val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
- table.partitionColumnNames.contains(c.getName)
- }
- sd.setCols(schema.asJava)
- tTable.setPartitionKeys(partCols.asJava)
-
- table.storage.locationUri.foreach(sd.setLocation)
- table.storage.inputFormat.foreach(sd.setInputFormat)
- table.storage.outputFormat.foreach(sd.setOutputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- table.storage.serde.foreach(serdeInfo.setSerializationLib)
- sd.setSerdeInfo(serdeInfo)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
- serdeInfo.setParameters(serdeParameters)
-
- new HiveTable(tTable)
- }
-
- @transient override lazy val statistics: Statistics = Statistics(
- sizeInBytes = {
- val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
- val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
- // TODO: check if this estimate is valid for tables after partition pruning.
- // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore. An
- // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
- // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
- // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
- BigInt(
- // When table is external,`totalSize` is always zero, which will influence join strategy
- // so when `totalSize` is zero, use `rawDataSize` instead
- // if the size is still less than zero, we use default size
- Option(totalSize).map(_.toLong).filter(_ > 0)
- .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
- .getOrElse(sqlContext.conf.defaultSizeInBytes)))
- }
- )
-
- // When metastore partition pruning is turned off, we cache the list of all partitions to
- // mimic the behavior of Spark < 1.5
- private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
-
- def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
- val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
- client.getPartitionsByFilter(table, predicates)
- } else {
- allPartitions
- }
-
- rawPartitions.map { p =>
- val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
- tPartition.setDbName(databaseName)
- tPartition.setTableName(tableName)
- tPartition.setValues(p.spec.values.toList.asJava)
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tPartition.setSd(sd)
- sd.setCols(table.schema.map(toHiveColumn).asJava)
- p.storage.locationUri.foreach(sd.setLocation)
- p.storage.inputFormat.foreach(sd.setInputFormat)
- p.storage.outputFormat.foreach(sd.setOutputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- sd.setSerdeInfo(serdeInfo)
- // maps and lists should be set only after all elements are ready (see HIVE-7975)
- p.storage.serde.foreach(serdeInfo.setSerializationLib)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
- p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
- serdeInfo.setParameters(serdeParameters)
-
- new Partition(hiveQlTable, tPartition)
- }
- }
-
- /** Only compare database and tablename, not alias. */
- override def sameResult(plan: LogicalPlan): Boolean = {
- plan match {
- case mr: MetastoreRelation =>
- mr.databaseName == databaseName && mr.tableName == tableName
- case _ => false
- }
- }
-
- val tableDesc = new TableDesc(
- hiveQlTable.getInputFormatClass,
- // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
- // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
- // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
- // HiveSequenceFileOutputFormat.
- hiveQlTable.getOutputFormatClass,
- hiveQlTable.getMetadata
- )
-
- implicit class SchemaAttribute(f: CatalogColumn) {
- def toAttribute: AttributeReference = AttributeReference(
- f.name,
- HiveMetastoreTypes.toDataType(f.dataType),
- // Since data can be dumped in randomly with no validation, everything is nullable.
- nullable = true
- )(qualifier = Some(alias.getOrElse(tableName)))
- }
-
- /** PartitionKey attributes */
- val partitionKeys = table.partitionColumns.map(_.toAttribute)
-
- /** Non-partitionKey attributes */
- // TODO: just make this hold the schema itself, not just non-partition columns
- val attributes = table.schema
- .filter { c => !table.partitionColumnNames.contains(c.name) }
- .map(_.toAttribute)
-
- val output = attributes ++ partitionKeys
-
- /** An attribute map that can be used to lookup original attributes based on expression id. */
- val attributeMap = AttributeMap(output.map(o => (o, o)))
-
- /** An attribute map for determining the ordinal for non-partition columns. */
- val columnOrdinals = AttributeMap(attributes.zipWithIndex)
-
- override def inputFiles: Array[String] = {
- val partLocations = client
- .getPartitionsByFilter(table, Nil)
- .flatMap(_.storage.locationUri)
- .toArray
- if (partLocations.nonEmpty) {
- partLocations
- } else {
- Array(
- table.storage.locationUri.getOrElse(
- sys.error(s"Could not get the location of ${table.qualifiedName}.")))
- }
- }
-
-
- override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
- }
-}
-
private[hive] object HiveMetastoreTypes {
def toDataType(metastoreType: String): DataType = DataTypeParser.parse(metastoreType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
new file mode 100644
index 0000000000..a66c325b8f
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.collection.JavaConverters._
+
+import com.google.common.base.Objects
+import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+private[hive] case class MetastoreRelation(
+ databaseName: String,
+ tableName: String,
+ alias: Option[String])
+ (val table: CatalogTable,
+ @transient private val client: HiveClient,
+ @transient private val sqlContext: SQLContext)
+ extends LeafNode with MultiInstanceRelation with FileRelation {
+
+ override def equals(other: Any): Boolean = other match {
+ case relation: MetastoreRelation =>
+ databaseName == relation.databaseName &&
+ tableName == relation.tableName &&
+ alias == relation.alias &&
+ output == relation.output
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ Objects.hashCode(databaseName, tableName, alias, output)
+ }
+
+ override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
+
+ private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+ new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ }
+
+ // TODO: merge this with HiveClientImpl#toHiveTable
+ @transient val hiveQlTable: HiveTable = {
+ // We start by constructing an API table as Hive performs several important transformations
+ // internally when converting an API table to a QL table.
+ val tTable = new org.apache.hadoop.hive.metastore.api.Table()
+ tTable.setTableName(table.identifier.table)
+ tTable.setDbName(table.database)
+
+ val tableParameters = new java.util.HashMap[String, String]()
+ tTable.setParameters(tableParameters)
+ table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
+
+ tTable.setTableType(table.tableType match {
+ case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
+ case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
+ case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
+ case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+ })
+
+ val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+ tTable.setSd(sd)
+
+ // Note: In Hive the schema and partition columns must be disjoint sets
+ val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
+ table.partitionColumnNames.contains(c.getName)
+ }
+ sd.setCols(schema.asJava)
+ tTable.setPartitionKeys(partCols.asJava)
+
+ table.storage.locationUri.foreach(sd.setLocation)
+ table.storage.inputFormat.foreach(sd.setInputFormat)
+ table.storage.outputFormat.foreach(sd.setOutputFormat)
+
+ val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+ table.storage.serde.foreach(serdeInfo.setSerializationLib)
+ sd.setSerdeInfo(serdeInfo)
+
+ val serdeParameters = new java.util.HashMap[String, String]()
+ table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ serdeInfo.setParameters(serdeParameters)
+
+ new HiveTable(tTable)
+ }
+
+ @transient override lazy val statistics: Statistics = Statistics(
+ sizeInBytes = {
+ val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
+ val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
+ // TODO: check if this estimate is valid for tables after partition pruning.
+ // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
+ // relatively cheap if parameters for the table are populated into the metastore. An
+ // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
+ // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
+ // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
+ BigInt(
+ // When table is external,`totalSize` is always zero, which will influence join strategy
+ // so when `totalSize` is zero, use `rawDataSize` instead
+ // if the size is still less than zero, we use default size
+ Option(totalSize).map(_.toLong).filter(_ > 0)
+ .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
+ .getOrElse(sqlContext.conf.defaultSizeInBytes)))
+ }
+ )
+
+ // When metastore partition pruning is turned off, we cache the list of all partitions to
+ // mimic the behavior of Spark < 1.5
+ private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
+
+ def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
+ val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
+ client.getPartitionsByFilter(table, predicates)
+ } else {
+ allPartitions
+ }
+
+ rawPartitions.map { p =>
+ val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
+ tPartition.setDbName(databaseName)
+ tPartition.setTableName(tableName)
+ tPartition.setValues(p.spec.values.toList.asJava)
+
+ val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+ tPartition.setSd(sd)
+ sd.setCols(table.schema.map(toHiveColumn).asJava)
+ p.storage.locationUri.foreach(sd.setLocation)
+ p.storage.inputFormat.foreach(sd.setInputFormat)
+ p.storage.outputFormat.foreach(sd.setOutputFormat)
+
+ val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+ sd.setSerdeInfo(serdeInfo)
+ // maps and lists should be set only after all elements are ready (see HIVE-7975)
+ p.storage.serde.foreach(serdeInfo.setSerializationLib)
+
+ val serdeParameters = new java.util.HashMap[String, String]()
+ table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ serdeInfo.setParameters(serdeParameters)
+
+ new Partition(hiveQlTable, tPartition)
+ }
+ }
+
+ /** Only compare database and tablename, not alias. */
+ override def sameResult(plan: LogicalPlan): Boolean = {
+ plan match {
+ case mr: MetastoreRelation =>
+ mr.databaseName == databaseName && mr.tableName == tableName
+ case _ => false
+ }
+ }
+
+ val tableDesc = new TableDesc(
+ hiveQlTable.getInputFormatClass,
+ // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
+ // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
+ // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
+ // HiveSequenceFileOutputFormat.
+ hiveQlTable.getOutputFormatClass,
+ hiveQlTable.getMetadata
+ )
+
+ implicit class SchemaAttribute(f: CatalogColumn) {
+ def toAttribute: AttributeReference = AttributeReference(
+ f.name,
+ DataTypeParser.parse(f.dataType),
+ // Since data can be dumped in randomly with no validation, everything is nullable.
+ nullable = true
+ )(qualifier = Some(alias.getOrElse(tableName)))
+ }
+
+ /** PartitionKey attributes */
+ val partitionKeys = table.partitionColumns.map(_.toAttribute)
+
+ /** Non-partitionKey attributes */
+ // TODO: just make this hold the schema itself, not just non-partition columns
+ val attributes = table.schema
+ .filter { c => !table.partitionColumnNames.contains(c.name) }
+ .map(_.toAttribute)
+
+ val output = attributes ++ partitionKeys
+
+ /** An attribute map that can be used to lookup original attributes based on expression id. */
+ val attributeMap = AttributeMap(output.map(o => (o, o)))
+
+ /** An attribute map for determining the ordinal for non-partition columns. */
+ val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+
+ override def inputFiles: Array[String] = {
+ val partLocations = client
+ .getPartitionsByFilter(table, Nil)
+ .flatMap(_.storage.locationUri)
+ .toArray
+ if (partLocations.nonEmpty) {
+ partLocations
+ } else {
+ Array(
+ table.storage.locationUri.getOrElse(
+ sys.error(s"Could not get the location of ${table.qualifiedName}.")))
+ }
+ }
+
+ override def newInstance(): MetastoreRelation = {
+ MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
+ }
+}