diff options
13 files changed, 199 insertions, 226 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 34e1cb7315..ab5124ea56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -266,7 +266,7 @@ class SessionCatalog( val relation = if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) - CatalogRelation(db, metadata, alias) + SimpleCatalogRelation(db, metadata, alias) } else { tempTables(table) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index ad989a97e4..d2294efd9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -295,17 +295,31 @@ object ExternalCatalog { /** + * An interface that is implemented by logical plans to return the underlying catalog table. + * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should + * probably remove this interface. + */ +trait CatalogRelation { + def catalogTable: CatalogTable +} + + +/** * A [[LogicalPlan]] that wraps [[CatalogTable]]. + * + * Note that in the future we should consolidate this and HiveCatalogRelation. */ -case class CatalogRelation( - db: String, +case class SimpleCatalogRelation( + databaseName: String, metadata: CatalogTable, alias: Option[String] = None) - extends LeafNode { + extends LeafNode with CatalogRelation { // TODO: implement this override def output: Seq[Attribute] = Seq.empty - require(metadata.identifier.database == Some(db), + override def catalogTable: CatalogTable = metadata + + require(metadata.identifier.database == Some(databaseName), "provided database does not match the one specified in the table definition") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 426273e1e3..27205c4587 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -372,25 +372,25 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) - == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1))) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1))) // Otherwise, we'll first look up a temporary table with the same name assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1))) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1))) } test("lookup table relation with alias") { val catalog = new SessionCatalog(newBasicCatalog()) val alias = "monster" val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) - val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata)) + val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata)) val relationWithAlias = SubqueryAlias(alias, SubqueryAlias("tbl1", - CatalogRelation("db2", tableMetadata, Some(alias)))) + SimpleCatalogRelation("db2", tableMetadata, Some(alias)))) assert(catalog.lookupRelation( TableIdentifier("tbl1", Some("db2")), alias = None) == relation) assert(catalog.lookupRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 05fb1ef631..cae6430693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -87,6 +87,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other + * options are passed on to Hive) e.g.: + * {{{ + * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; + * }}} + */ + override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { + if (ctx.partitionSpec == null && + ctx.identifier != null && + ctx.identifier.getText.toLowerCase == "noscan") { + AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) + } else { + // Always just run the no scan analyze. We should fix this and implement full analyze + // command in the future. + AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) + } + } + + /** * Create a [[SetDatabaseCommand]] logical plan. */ override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala new file mode 100644 index 0000000000..7fa246ba51 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} + + +/** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports Hive tables and it only updates the size of a Hive table + * in the Hive metastore. + */ +case class AnalyzeTable(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val sessionState = sqlContext.sessionState + val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) + val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + + relation match { + case relation: CatalogRelation => + val catalogTable: CatalogTable = relation.catalogTable + // This method is mainly based on + // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) + // in Hive 0.13 (except that we do not use fs.getContentSummary). + // TODO: Generalize statistics collection. + // TODO: Why fs.getContentSummary returns wrong size on Jenkins? + // Can we use fs.getContentSummary in future? + // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use + // countFileSize to count the table size. + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + + def calculateTableSize(fs: FileSystem, path: Path): Long = { + val fileStatus = fs.getFileStatus(path) + val size = if (fileStatus.isDirectory) { + fs.listStatus(path) + .map { status => + if (!status.getPath.getName.startsWith(stagingDir)) { + calculateTableSize(fs, status.getPath) + } else { + 0L + } + }.sum + } else { + fileStatus.getLen + } + + size + } + + val tableParameters = catalogTable.properties + val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L) + val newTotalSize = + catalogTable.storage.locationUri.map { p => + val path = new Path(p) + try { + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + calculateTableSize(fs, path) + } catch { + case NonFatal(e) => + logWarning( + s"Failed to get the size of table ${catalogTable.identifier.table} in the " + + s"database ${catalogTable.identifier.database} because of ${e.toString}", e) + 0L + } + }.getOrElse(0L) + + // Update the Hive metastore if the total size of the table is different than the size + // recorded in the Hive metastore. + // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + sessionState.catalog.alterTable( + catalogTable.copy( + properties = relation.catalogTable.properties + + (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString))) + } + + case otherRelation => + throw new UnsupportedOperationException( + s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") + } + Seq.empty[Row] + } +} + +object AnalyzeTable { + val TOTAL_SIZE_FIELD = "totalSize" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index e1be4b882f..c423b84957 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.AnalyzeTable import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.util.ExecutionListenerManager @@ -162,8 +163,15 @@ private[sql] class SessionState(ctx: SQLContext) { ctx.sparkContext.addJar(path) } + /** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports catalog tables and it only updates the size of a catalog table + * in the external catalog. + */ def analyze(tableName: String): Unit = { - throw new UnsupportedOperationException + AnalyzeTable(tableName).run(ctx) } def runNativeSql(sql: String): Seq[String] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ca397910c6..df2b6beac6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -392,7 +392,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => - val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq + val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq val cachedRelationFileFormatClass = relation.fileFormat.getClass expectedFileFormat match { @@ -467,7 +467,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { Some(partitionSpec)) val hadoopFsRelation = cached.getOrElse { - val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil + val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) val inferredSchema = if (fileType.equals("parquet")) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 6f4332c65f..4db0d78cfc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -24,12 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer -import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} -import org.apache.spark.sql.hive.execution.{AnalyzeTable, HiveSqlParser} import org.apache.spark.sql.internal.{SessionState, SQLConf} @@ -106,11 +104,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) } /** - * Parser for HiveQl query texts. - */ - override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf) - - /** * Planner that takes into account Hive-specific strategies. */ override def planner: SparkPlanner = { @@ -175,17 +168,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) } /** - * Analyzes the given table in the current database to generate statistics, which will be - * used in query optimizations. - * - * Right now, it only supports Hive tables and it only updates the size of a Hive table - * in the Hive metastore. - */ - override def analyze(tableName: String): Unit = { - AnalyzeTable(tableName).run(ctx) - } - - /** * Execute a SQL statement by passing the query text directly to Hive. */ override def runNativeSql(sql: String): Seq[String] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index a66c325b8f..cd45706841 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} @@ -37,13 +37,13 @@ import org.apache.spark.sql.hive.client.HiveClient private[hive] case class MetastoreRelation( - databaseName: String, - tableName: String, - alias: Option[String]) - (val table: CatalogTable, - @transient private val client: HiveClient, - @transient private val sqlContext: SQLContext) - extends LeafNode with MultiInstanceRelation with FileRelation { + databaseName: String, + tableName: String, + alias: Option[String]) + (val catalogTable: CatalogTable, + @transient private val client: HiveClient, + @transient private val sqlContext: SQLContext) + extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { override def equals(other: Any): Boolean = other match { case relation: MetastoreRelation => @@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation( Objects.hashCode(databaseName, tableName, alias, output) } - override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil + override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil private def toHiveColumn(c: CatalogColumn): FieldSchema = { new FieldSchema(c.name, c.dataType, c.comment.orNull) @@ -69,14 +69,14 @@ private[hive] case class MetastoreRelation( // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(table.identifier.table) - tTable.setDbName(table.database) + tTable.setTableName(catalogTable.identifier.table) + tTable.setDbName(catalogTable.database) val tableParameters = new java.util.HashMap[String, String]() tTable.setParameters(tableParameters) - table.properties.foreach { case (k, v) => tableParameters.put(k, v) } + catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) } - tTable.setTableType(table.tableType match { + tTable.setTableType(catalogTable.tableType match { case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString @@ -87,22 +87,22 @@ private[hive] case class MetastoreRelation( tTable.setSd(sd) // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => - table.partitionColumnNames.contains(c.getName) + val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => + catalogTable.partitionColumnNames.contains(c.getName) } sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) - table.storage.locationUri.foreach(sd.setLocation) - table.storage.inputFormat.foreach(sd.setInputFormat) - table.storage.outputFormat.foreach(sd.setOutputFormat) + catalogTable.storage.locationUri.foreach(sd.setLocation) + catalogTable.storage.inputFormat.foreach(sd.setInputFormat) + catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - table.storage.serde.foreach(serdeInfo.setSerializationLib) + catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) sd.setSerdeInfo(serdeInfo) val serdeParameters = new java.util.HashMap[String, String]() - table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) new HiveTable(tTable) @@ -130,11 +130,11 @@ private[hive] case class MetastoreRelation( // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table) + private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { - client.getPartitionsByFilter(table, predicates) + client.getPartitionsByFilter(catalogTable, predicates) } else { allPartitions } @@ -147,7 +147,7 @@ private[hive] case class MetastoreRelation( val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) - sd.setCols(table.schema.map(toHiveColumn).asJava) + sd.setCols(catalogTable.schema.map(toHiveColumn).asJava) p.storage.locationUri.foreach(sd.setLocation) p.storage.inputFormat.foreach(sd.setInputFormat) p.storage.outputFormat.foreach(sd.setOutputFormat) @@ -158,7 +158,7 @@ private[hive] case class MetastoreRelation( p.storage.serde.foreach(serdeInfo.setSerializationLib) val serdeParameters = new java.util.HashMap[String, String]() - table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } serdeInfo.setParameters(serdeParameters) @@ -195,12 +195,12 @@ private[hive] case class MetastoreRelation( } /** PartitionKey attributes */ - val partitionKeys = table.partitionColumns.map(_.toAttribute) + val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute) /** Non-partitionKey attributes */ // TODO: just make this hold the schema itself, not just non-partition columns - val attributes = table.schema - .filter { c => !table.partitionColumnNames.contains(c.name) } + val attributes = catalogTable.schema + .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } .map(_.toAttribute) val output = attributes ++ partitionKeys @@ -213,19 +213,19 @@ private[hive] case class MetastoreRelation( override def inputFiles: Array[String] = { val partLocations = client - .getPartitionsByFilter(table, Nil) + .getPartitionsByFilter(catalogTable, Nil) .flatMap(_.storage.locationUri) .toArray if (partLocations.nonEmpty) { partLocations } else { Array( - table.storage.locationUri.getOrElse( - sys.error(s"Could not get the location of ${table.qualifiedName}."))) + catalogTable.storage.locationUri.getOrElse( + sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) } } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext) + MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala deleted file mode 100644 index 35530b9814..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.catalyst.parser._ -import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.HiveNativeCommand -import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} - -/** - * Concrete parser for HiveQl statements. - */ -class HiveSqlParser(conf: SQLConf) extends AbstractSqlParser { - - val astBuilder = new HiveSqlAstBuilder(conf) - - private val substitutor = new VariableSubstitution(conf) - - protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { - super.parse(substitutor.substitute(command))(toResult) - } - - protected override def nativeCommand(sqlText: String): LogicalPlan = { - HiveNativeCommand(substitutor.substitute(sqlText)) - } -} - -/** - * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. - */ -class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { - - import ParserUtils._ - - /** - * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other - * options are passed on to Hive) e.g.: - * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN; - * }}} - */ - override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { - if (ctx.partitionSpec == null && - ctx.identifier != null && - ctx.identifier.getText.toLowerCase == "noscan") { - AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) - } else { - // Always just run the no scan analyze. We should fix this and implement full analyze - // command in the future. - AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString) - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 7e9669af8b..6899f46eec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -17,109 +17,19 @@ package org.apache.spark.sql.hive.execution -import scala.util.control.NonFatal - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation} +import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -/** - * Analyzes the given table in the current database to generate statistics, which will be - * used in query optimizations. - * - * Right now, it only supports Hive tables and it only updates the size of a Hive table - * in the Hive metastore. - */ -private[hive] -case class AnalyzeTable(tableName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) - - relation match { - case relation: MetastoreRelation => - val catalogTable: CatalogTable = relation.table - // This method is mainly based on - // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) - // in Hive 0.13 (except that we do not use fs.getContentSummary). - // TODO: Generalize statistics collection. - // TODO: Why fs.getContentSummary returns wrong size on Jenkins? - // Can we use fs.getContentSummary in future? - // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use - // countFileSize to count the table size. - val stagingDir = sessionState.metadataHive.getConf( - HiveConf.ConfVars.STAGINGDIR.varname, - HiveConf.ConfVars.STAGINGDIR.defaultStrVal) - - def calculateTableSize(fs: FileSystem, path: Path): Long = { - val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDirectory) { - fs.listStatus(path) - .map { status => - if (!status.getPath().getName().startsWith(stagingDir)) { - calculateTableSize(fs, status.getPath) - } else { - 0L - } - } - .sum - } else { - fileStatus.getLen - } - - size - } - - val tableParameters = catalogTable.properties - val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L) - val newTotalSize = - catalogTable.storage.locationUri.map { p => - val path = new Path(p) - try { - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - calculateTableSize(fs, path) - } catch { - case NonFatal(e) => - logWarning( - s"Failed to get the size of table ${catalogTable.identifier.table} in the " + - s"database ${catalogTable.identifier.database} because of ${e.toString}", e) - 0L - } - }.getOrElse(0L) - - // Update the Hive metastore if the total size of the table is different than the size - // recorded in the Hive metastore. - // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.alterTable( - catalogTable.copy( - properties = relation.table.properties + - (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) - } - - case otherRelation => - throw new UnsupportedOperationException( - s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}") - } - Seq.empty[Row] - } -} - private[hive] case class CreateMetastoreDataSource( tableIdent: TableIdentifier, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 7a6f1ce0d1..565b310bb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,9 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.HiveNativeCommand +import org.apache.spark.sql.execution.command.AnalyzeTable import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -117,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { // Try to analyze a temp table sql("""SELECT * FROM src""").registerTempTable("tempTable") intercept[UnsupportedOperationException] { - hiveContext.sessionState.analyze("tempTable") + hiveContext.sql("ANALYZE TABLE tempTable COMPUTE STATISTICS") } hiveContext.sessionState.catalog.dropTable( TableIdentifier("tempTable"), ignoreIfNotExists = true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 79ac53c863..12f30e2e74 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -153,7 +153,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) - val partValues = if (relation.table.partitionColumnNames.nonEmpty) { + val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) { p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) } else { Seq.empty |