From 3394b12c379fe0a423d73dc6316aadca18cd2110 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 16 Apr 2016 15:35:51 -0700 Subject: [SPARK-14672][SQL] Move HiveContext analyze logic to AnalyzeTable ## What changes were proposed in this pull request? Move the implementation of `hiveContext.analyze` to the command of `AnalyzeTable`. ## How was this patch tested? Existing tests. Closes #12429 Author: Yin Huai Author: Andrew Or Closes #12448 from yhuai/analyzeTable. --- .../org/apache/spark/sql/hive/HiveContext.scala | 78 +-------------------- .../apache/spark/sql/hive/execution/commands.scala | 81 +++++++++++++++++++++- 2 files changed, 81 insertions(+), 78 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e366743118..71ef99a6a9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -29,12 +29,9 @@ import scala.collection.mutable.HashMap import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo @@ -45,13 +42,12 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} +import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand} import org.apache.spark.sql.internal.{SharedState, SQLConf} import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ @@ -192,77 +188,7 @@ class HiveContext private[hive]( * @since 1.2.0 */ def analyze(tableName: String) { - val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) - - relation match { - case relation: MetastoreRelation => - // This method is mainly based on - // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) - // in Hive 0.13 (except that we do not use fs.getContentSummary). - // TODO: Generalize statistics collection. - // TODO: Why fs.getContentSummary returns wrong size on Jenkins? - // Can we use fs.getContentSummary in future? - // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use - // countFileSize to count the table size. - val stagingDir = metadataHive.getConf(HiveConf.ConfVars.STAGINGDIR.varname, - HiveConf.ConfVars.STAGINGDIR.defaultStrVal) - - def calculateTableSize(fs: FileSystem, path: Path): Long = { - val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDirectory) { - fs.listStatus(path) - .map { status => - if (!status.getPath().getName().startsWith(stagingDir)) { - calculateTableSize(fs, status.getPath) - } else { - 0L - } - } - .sum - } else { - fileStatus.getLen - } - - size - } - - def getFileSizeForTable(conf: HiveConf, table: Table): Long = { - val path = table.getPath - var size: Long = 0L - try { - val fs = path.getFileSystem(conf) - size = calculateTableSize(fs, path) - } catch { - case e: Exception => - logWarning( - s"Failed to get the size of table ${table.getTableName} in the " + - s"database ${table.getDbName} because of ${e.toString}", e) - size = 0L - } - - size - } - - val tableParameters = relation.hiveQlTable.getParameters - val oldTotalSize = - Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)) - .map(_.toLong) - .getOrElse(0L) - val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable) - // Update the Hive metastore if the total size of the table is different than the size - // recorded in the Hive metastore. - // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.alterTable( - relation.table.copy( - properties = relation.table.properties + - (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) - } - case otherRelation => - throw new UnsupportedOperationException( - s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") - } + AnalyzeTable(tableName).run(self) } override def setConf(key: String, value: String): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 06badff474..0c06608ff9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.hive.execution +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils +import org.apache.hadoop.hive.ql.metadata.Table import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -26,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation} -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -41,7 +45,80 @@ private[hive] case class AnalyzeTable(tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.asInstanceOf[HiveContext].analyze(tableName) + val sessionState = sqlContext.sessionState + val hiveContext = sqlContext.asInstanceOf[HiveContext] + val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) + val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + + relation match { + case relation: MetastoreRelation => + // This method is mainly based on + // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) + // in Hive 0.13 (except that we do not use fs.getContentSummary). + // TODO: Generalize statistics collection. + // TODO: Why fs.getContentSummary returns wrong size on Jenkins? + // Can we use fs.getContentSummary in future? + // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use + // countFileSize to count the table size. + val stagingDir = hiveContext.metadataHive.getConf( + HiveConf.ConfVars.STAGINGDIR.varname, + HiveConf.ConfVars.STAGINGDIR.defaultStrVal) + + def calculateTableSize(fs: FileSystem, path: Path): Long = { + val fileStatus = fs.getFileStatus(path) + val size = if (fileStatus.isDirectory) { + fs.listStatus(path) + .map { status => + if (!status.getPath().getName().startsWith(stagingDir)) { + calculateTableSize(fs, status.getPath) + } else { + 0L + } + } + .sum + } else { + fileStatus.getLen + } + + size + } + + def getFileSizeForTable(conf: HiveConf, table: Table): Long = { + val path = table.getPath + var size: Long = 0L + try { + val fs = path.getFileSystem(conf) + size = calculateTableSize(fs, path) + } catch { + case e: Exception => + logWarning( + s"Failed to get the size of table ${table.getTableName} in the " + + s"database ${table.getDbName} because of ${e.toString}", e) + size = 0L + } + + size + } + + val tableParameters = relation.hiveQlTable.getParameters + val oldTotalSize = + Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)) + .map(_.toLong) + .getOrElse(0L) + val newTotalSize = getFileSizeForTable(hiveContext.hiveconf, relation.hiveQlTable) + // Update the Hive metastore if the total size of the table is different than the size + // recorded in the Hive metastore. + // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + sessionState.catalog.alterTable( + relation.table.copy( + properties = relation.table.properties + + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) + } + case otherRelation => + throw new UnsupportedOperationException( + s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") + } Seq.empty[Row] } } -- cgit v1.2.3