diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-21 17:41:29 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-21 17:41:29 -0700 |
commit | f181aee07c0ee105b2a34581105eeeada7d42363 (patch) | |
tree | d3dd7f74e0aa2ad9881517ce8c79052bf0863de0 /sql/hive/src/main/scala/org | |
parent | 4e726227a3e68c776ea30b78b7db8d01d00b44d6 (diff) | |
download | spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.gz spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.bz2 spark-f181aee07c0ee105b2a34581105eeeada7d42363.zip |
[SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder
## What changes were proposed in this pull request?
This patch moves analyze table parsing into SparkSqlAstBuilder and removes HiveSqlAstBuilder.
In order to avoid extensive refactoring, I created a common trait for CatalogRelation and MetastoreRelation, and match on that. In the future we should probably just consolidate the two into a single thing so we don't need this common trait.
## How was this patch tested?
Updated unit tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #12584 from rxin/SPARK-14821.
Diffstat (limited to 'sql/hive/src/main/scala/org')
5 files changed, 34 insertions, 212 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ca397910c6..df2b6beac6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -392,7 +392,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => - val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq + val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq val cachedRelationFileFormatClass = relation.fileFormat.getClass expectedFileFormat match { @@ -467,7 +467,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { Some(partitionSpec)) val hadoopFsRelation = cached.getOrElse { - val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil + val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) val inferredSchema = if (fileType.equals("parquet")) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 6f4332c65f..4db0d78cfc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -24,12 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer -import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} -import org.apache.spark.sql.hive.execution.{AnalyzeTable, HiveSqlParser} import org.apache.spark.sql.internal.{SessionState, SQLConf} @@ -106,11 +104,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) } /** - * Parser for HiveQl query texts. - */ - override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf) - - /** * Planner that takes into account Hive-specific strategies. */ override def planner: SparkPlanner = { @@ -175,17 +168,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) } /** - * Analyzes the given table in the current database to generate statistics, which will be - * used in query optimizations. - * - * Right now, it only supports Hive tables and it only updates the size of a Hive table - * in the Hive metastore. - */ - override def analyze(tableName: String): Unit = { - AnalyzeTable(tableName).run(ctx) - } - - /** * Execute a SQL statement by passing the query text directly to Hive. */ override def runNativeSql(sql: String): Seq[String] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index a66c325b8f..cd45706841 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} @@ -37,13 +37,13 @@ import org.apache.spark.sql.hive.client.HiveClient private[hive] case class MetastoreRelation( - databaseName: String, - tableName: String, - alias: Option[String]) - (val table: CatalogTable, - @transient private val client: HiveClient, - @transient private val sqlContext: SQLContext) - extends LeafNode with MultiInstanceRelation with FileRelation { + databaseName: String, + tableName: String, + alias: Option[String]) + (val catalogTable: CatalogTable, + @transient private val client: HiveClient, + @transient private val sqlContext: SQLContext) + extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { override def equals(other: Any): Boolean = other match { case relation: MetastoreRelation => @@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation( Objects.hashCode(databaseName, tableName, alias, output) } - override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil + override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil private def toHiveColumn(c: CatalogColumn): FieldSchema = { new FieldSchema(c.name, c.dataType, c.comment.orNull) @@ -69,14 +69,14 @@ private[hive] case class MetastoreRelation( // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(table.identifier.table) - tTable.setDbName(table.database) + tTable.setTableName(catalogTable.identifier.table) + tTable.setDbName(catalogTable.database) val tableParameters = new java.util.HashMap[String, String]() tTable.setParameters(tableParameters) - table.properties.foreach { case (k, v) => tableParameters.put(k, v) } + catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) } - tTable.setTableType(table.tableType match { + tTable.setTableType(catalogTable.tableType match { case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString @@ -87,22 +87,22 @@ private[hive] case class MetastoreRelation( tTable.setSd(sd) // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => - table.partitionColumnNames.contains(c.getName) + val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => + catalogTable.partitionColumnNames.contains(c.getName) } sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) - table.storage.locationUri.foreach(sd.setLocation) - table.storage.inputFormat.foreach(sd.setInputFormat) - table.storage.outputFormat.foreach(sd.setOutputFormat) + catalogTable.storage.locationUri.foreach(sd.setLocation) + catalogTable.storage.inputFormat.foreach(sd.setInputFormat) + catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - table.storage.serde.foreach(serdeInfo.setSerializationLib) + catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) sd.setSerdeInfo(serdeInfo) val serdeParameters = new java.util.HashMap[String, String]() - table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) new HiveTable(tTable) @@ -130,11 +130,11 @@ private[hive] case class MetastoreRelation( // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table) + private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { - client.getPartitionsByFilter(table, predicates) + client.getPartitionsByFilter(catalogTable, predicates) } else { allPartitions } @@ -147,7 +147,7 @@ private[hive] case class MetastoreRelation( val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) - sd.setCols(table.schema.map(toHiveColumn).asJava) + sd.setCols(catalogTable.schema.map(toHiveColumn).asJava) p.storage.locationUri.foreach(sd.setLocation) p.storage.inputFormat.foreach(sd.setInputFormat) p.storage.outputFormat.foreach(sd.setOutputFormat) @@ -158,7 +158,7 @@ private[hive] case class MetastoreRelation( p.storage.serde.foreach(serdeInfo.setSerializationLib) val serdeParameters = new java.util.HashMap[String, String]() - table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) @@ -195,12 +195,12 @@ private[hive] case class MetastoreRelation( } /** PartitionKey attributes */ - val partitionKeys = table.partitionColumns.map(_.toAttribute) + val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute) /** Non-partitionKey attributes */ // TODO: just make this hold the schema itself, not just non-partition columns - val attributes = table.schema - .filter { c => !table.partitionColumnNames.contains(c.name) } + val attributes = catalogTable.schema + .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } .map(_.toAttribute) val output = attributes ++ partitionKeys @@ -213,19 +213,19 @@ private[hive] case class MetastoreRelation( override def inputFiles: Array[String] = { val partLocations = client - .getPartitionsByFilter(table, Nil) + .getPartitionsByFilter(catalogTable, Nil) .flatMap(_.storage.locationUri) .toArray if (partLocations.nonEmpty) { partLocations } else { Array( - table.storage.locationUri.getOrElse( - sys.error(s"Could not get the location of ${table.qualifiedName}."))) + catalogTable.storage.locationUri.getOrElse( + sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) } } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext) + MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala deleted file mode 100644 index 35530b9814..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.catalyst.parser._ -import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.HiveNativeCommand -import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} - -/** - * Concrete parser for HiveQl statements. - */ -class HiveSqlParser(conf: SQLConf) extends AbstractSqlParser { - - val astBuilder = new HiveSqlAstBuilder(conf) - - private val substitutor = new VariableSubstitution(conf) - - protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { - super.parse(substitutor.substitute(command))(toResult) - } - - protected override def nativeCommand(sqlText: String): LogicalPlan = { - HiveNativeCommand(substitutor.substitute(sqlText)) - } -} - -/** - * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. - */ -class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { - - import ParserUtils._ - - /** - * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other - * options are passed on to Hive) e.g.: - * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; - * }}} - */ - override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { - if (ctx.partitionSpec == null && - ctx.identifier != null && - ctx.identifier.getText.toLowerCase == "noscan") { - AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) - } else { - // Always just run the no scan analyze. We should fix this and implement full analyze - // command in the future. - AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 7e9669af8b..6899f46eec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,109 +17,19 @@ package org.apache.spark.sql.hive.execution -import scala.util.control.NonFatal - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation} +import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -/** - * Analyzes the given table in the current database to generate statistics, which will be - * used in query optimizations. - * - * Right now, it only supports Hive tables and it only updates the size of a Hive table - * in the Hive metastore. - */ -private[hive] -case class AnalyzeTable(tableName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) - - relation match { - case relation: MetastoreRelation => - val catalogTable: CatalogTable = relation.table - // This method is mainly based on - // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) - // in Hive 0.13 (except that we do not use fs.getContentSummary). - // TODO: Generalize statistics collection. - // TODO: Why fs.getContentSummary returns wrong size on Jenkins? - // Can we use fs.getContentSummary in future? - // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use - // countFileSize to count the table size. - val stagingDir = sessionState.metadataHive.getConf( - HiveConf.ConfVars.STAGINGDIR.varname, - HiveConf.ConfVars.STAGINGDIR.defaultStrVal) - - def calculateTableSize(fs: FileSystem, path: Path): Long = { - val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDirectory) { - fs.listStatus(path) - .map { status => - if (!status.getPath().getName().startsWith(stagingDir)) { - calculateTableSize(fs, status.getPath) - } else { - 0L - } - } - .sum - } else { - fileStatus.getLen - } - - size - } - - val tableParameters = catalogTable.properties - val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L) - val newTotalSize = - catalogTable.storage.locationUri.map { p => - val path = new Path(p) - try { - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - calculateTableSize(fs, path) - } catch { - case NonFatal(e) => - logWarning( - s"Failed to get the size of table ${catalogTable.identifier.table} in the " + - s"database ${catalogTable.identifier.database} because of ${e.toString}", e) - 0L - } - }.getOrElse(0L) - - // Update the Hive metastore if the total size of the table is different than the size - // recorded in the Hive metastore. - // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.alterTable( - catalogTable.copy( - properties = relation.table.properties + - (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) - } - - case otherRelation => - throw new UnsupportedOperationException( - s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") - } - Seq.empty[Row] - } -} - private[hive] case class CreateMetastoreDataSource( tableIdent: TableIdentifier, |