aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 17:41:29 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 17:41:29 -0700
commitf181aee07c0ee105b2a34581105eeeada7d42363 (patch)
treed3dd7f74e0aa2ad9881517ce8c79052bf0863de0 /sql/core
parent4e726227a3e68c776ea30b78b7db8d01d00b44d6 (diff)
downloadspark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.gz
spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.bz2
spark-f181aee07c0ee105b2a34581105eeeada7d42363.zip
[SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder
## What changes were proposed in this pull request? This patch moves analyze table parsing into SparkSqlAstBuilder and removes HiveSqlAstBuilder. In order to avoid extensive refactoring, I created a common trait for CatalogRelation and MetastoreRelation, and match on that. In the future we should probably just consolidate the two into a single thing so we don't need this common trait. ## How was this patch tested? Updated unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #12584 from rxin/SPARK-14821.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala111
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala10
3 files changed, 139 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 05fb1ef631..cae6430693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -87,6 +87,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
+ * options are passed on to Hive) e.g.:
+ * {{{
+ * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+ * }}}
+ */
+ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
+ if (ctx.partitionSpec == null &&
+ ctx.identifier != null &&
+ ctx.identifier.getText.toLowerCase == "noscan") {
+ AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+ } else {
+ // Always just run the no scan analyze. We should fix this and implement full analyze
+ // command in the future.
+ AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+ }
+ }
+
+ /**
* Create a [[SetDatabaseCommand]] logical plan.
*/
override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
new file mode 100644
index 0000000000..7fa246ba51
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+
+
+/**
+ * Analyzes the given table in the current database to generate statistics, which will be
+ * used in query optimizations.
+ *
+ * Right now, it only supports Hive tables and it only updates the size of a Hive table
+ * in the Hive metastore.
+ */
+case class AnalyzeTable(tableName: String) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val sessionState = sqlContext.sessionState
+ val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
+ val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+ relation match {
+ case relation: CatalogRelation =>
+ val catalogTable: CatalogTable = relation.catalogTable
+ // This method is mainly based on
+ // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+ // in Hive 0.13 (except that we do not use fs.getContentSummary).
+ // TODO: Generalize statistics collection.
+ // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+ // Can we use fs.getContentSummary in future?
+ // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+ // countFileSize to count the table size.
+ val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
+ def calculateTableSize(fs: FileSystem, path: Path): Long = {
+ val fileStatus = fs.getFileStatus(path)
+ val size = if (fileStatus.isDirectory) {
+ fs.listStatus(path)
+ .map { status =>
+ if (!status.getPath.getName.startsWith(stagingDir)) {
+ calculateTableSize(fs, status.getPath)
+ } else {
+ 0L
+ }
+ }.sum
+ } else {
+ fileStatus.getLen
+ }
+
+ size
+ }
+
+ val tableParameters = catalogTable.properties
+ val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
+ val newTotalSize =
+ catalogTable.storage.locationUri.map { p =>
+ val path = new Path(p)
+ try {
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ calculateTableSize(fs, path)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(
+ s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
+ s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
+ 0L
+ }
+ }.getOrElse(0L)
+
+ // Update the Hive metastore if the total size of the table is different than the size
+ // recorded in the Hive metastore.
+ // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ sessionState.catalog.alterTable(
+ catalogTable.copy(
+ properties = relation.catalogTable.properties +
+ (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
+ }
+
+ case otherRelation =>
+ throw new UnsupportedOperationException(
+ s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
+ }
+ Seq.empty[Row]
+ }
+}
+
+object AnalyzeTable {
+ val TOTAL_SIZE_FIELD = "totalSize"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index e1be4b882f..c423b84957 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.AnalyzeTable
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -162,8 +163,15 @@ private[sql] class SessionState(ctx: SQLContext) {
ctx.sparkContext.addJar(path)
}
+ /**
+ * Analyzes the given table in the current database to generate statistics, which will be
+ * used in query optimizations.
+ *
+ * Right now, it only supports catalog tables and it only updates the size of a catalog table
+ * in the external catalog.
+ */
def analyze(tableName: String): Unit = {
- throw new UnsupportedOperationException
+ AnalyzeTable(tableName).run(ctx)
}
def runNativeSql(sql: String): Seq[String] = {