diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-21 17:41:29 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-21 17:41:29 -0700 |
commit | f181aee07c0ee105b2a34581105eeeada7d42363 (patch) | |
tree | d3dd7f74e0aa2ad9881517ce8c79052bf0863de0 /sql/core | |
parent | 4e726227a3e68c776ea30b78b7db8d01d00b44d6 (diff) | |
download | spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.gz spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.bz2 spark-f181aee07c0ee105b2a34581105eeeada7d42363.zip |
[SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder
## What changes were proposed in this pull request?
This patch moves analyze table parsing into SparkSqlAstBuilder and removes HiveSqlAstBuilder.
In order to avoid extensive refactoring, I created a common trait for CatalogRelation and MetastoreRelation, and match on that. In the future we should probably just consolidate the two into a single thing so we don't need this common trait.
## How was this patch tested?
Updated unit tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #12584 from rxin/SPARK-14821.
Diffstat (limited to 'sql/core')
3 files changed, 139 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 05fb1ef631..cae6430693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -87,6 +87,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other + * options are passed on to Hive) e.g.: + * {{{ + * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; + * }}} + */ + override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { + if (ctx.partitionSpec == null && + ctx.identifier != null && + ctx.identifier.getText.toLowerCase == "noscan") { + AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) + } else { + // Always just run the no scan analyze. We should fix this and implement full analyze + // command in the future. + AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) + } + } + + /** * Create a [[SetDatabaseCommand]] logical plan. */ override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala new file mode 100644 index 0000000000..7fa246ba51 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} + + +/** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports Hive tables and it only updates the size of a Hive table + * in the Hive metastore. + */ +case class AnalyzeTable(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val sessionState = sqlContext.sessionState + val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) + val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + + relation match { + case relation: CatalogRelation => + val catalogTable: CatalogTable = relation.catalogTable + // This method is mainly based on + // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) + // in Hive 0.13 (except that we do not use fs.getContentSummary). + // TODO: Generalize statistics collection. + // TODO: Why fs.getContentSummary returns wrong size on Jenkins? + // Can we use fs.getContentSummary in future? + // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use + // countFileSize to count the table size. + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + + def calculateTableSize(fs: FileSystem, path: Path): Long = { + val fileStatus = fs.getFileStatus(path) + val size = if (fileStatus.isDirectory) { + fs.listStatus(path) + .map { status => + if (!status.getPath.getName.startsWith(stagingDir)) { + calculateTableSize(fs, status.getPath) + } else { + 0L + } + }.sum + } else { + fileStatus.getLen + } + + size + } + + val tableParameters = catalogTable.properties + val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L) + val newTotalSize = + catalogTable.storage.locationUri.map { p => + val path = new Path(p) + try { + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + calculateTableSize(fs, path) + } catch { + case NonFatal(e) => + logWarning( + s"Failed to get the size of table ${catalogTable.identifier.table} in the " + + s"database ${catalogTable.identifier.database} because of ${e.toString}", e) + 0L + } + }.getOrElse(0L) + + // Update the Hive metastore if the total size of the table is different than the size + // recorded in the Hive metastore. + // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + sessionState.catalog.alterTable( + catalogTable.copy( + properties = relation.catalogTable.properties + + (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString))) + } + + case otherRelation => + throw new UnsupportedOperationException( + s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") + } + Seq.empty[Row] + } +} + +object AnalyzeTable { + val TOTAL_SIZE_FIELD = "totalSize" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index e1be4b882f..c423b84957 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.AnalyzeTable import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.util.ExecutionListenerManager @@ -162,8 +163,15 @@ private[sql] class SessionState(ctx: SQLContext) { ctx.sparkContext.addJar(path) } + /** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports catalog tables and it only updates the size of a catalog table + * in the external catalog. + */ def analyze(tableName: String): Unit = { - throw new UnsupportedOperationException + AnalyzeTable(tableName).run(ctx) } def runNativeSql(sql: String): Seq[String] = { |