aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 17:41:29 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 17:41:29 -0700
commitf181aee07c0ee105b2a34581105eeeada7d42363 (patch)
treed3dd7f74e0aa2ad9881517ce8c79052bf0863de0 /sql/hive/src/main/scala/org
parent4e726227a3e68c776ea30b78b7db8d01d00b44d6 (diff)
downloadspark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.gz
spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.bz2
spark-f181aee07c0ee105b2a34581105eeeada7d42363.zip
[SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder
## What changes were proposed in this pull request? This patch moves analyze table parsing into SparkSqlAstBuilder and removes HiveSqlAstBuilder. In order to avoid extensive refactoring, I created a common trait for CatalogRelation and MetastoreRelation, and match on that. In the future we should probably just consolidate the two into a single thing so we don't need this common trait. ## How was this patch tested? Updated unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #12584 from rxin/SPARK-14821.
Diffstat (limited to 'sql/hive/src/main/scala/org')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala62
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala92
5 files changed, 34 insertions, 212 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ca397910c6..df2b6beac6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -392,7 +392,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
- val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq
+ val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
val cachedRelationFileFormatClass = relation.fileFormat.getClass
expectedFileFormat match {
@@ -467,7 +467,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
Some(partitionSpec))
val hadoopFsRelation = cached.getOrElse {
- val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
+ val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val inferredSchema = if (fileType.equals("parquet")) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6f4332c65f..4db0d78cfc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -24,12 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
-import org.apache.spark.sql.hive.execution.{AnalyzeTable, HiveSqlParser}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
@@ -106,11 +104,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
}
/**
- * Parser for HiveQl query texts.
- */
- override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf)
-
- /**
* Planner that takes into account Hive-specific strategies.
*/
override def planner: SparkPlanner = {
@@ -175,17 +168,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
}
/**
- * Analyzes the given table in the current database to generate statistics, which will be
- * used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
- */
- override def analyze(tableName: String): Unit = {
- AnalyzeTable(tableName).run(ctx)
- }
-
- /**
* Execute a SQL statement by passing the query text directly to Hive.
*/
override def runNativeSql(sql: String): Seq[String] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index a66c325b8f..cd45706841 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -37,13 +37,13 @@ import org.apache.spark.sql.hive.client.HiveClient
private[hive] case class MetastoreRelation(
- databaseName: String,
- tableName: String,
- alias: Option[String])
- (val table: CatalogTable,
- @transient private val client: HiveClient,
- @transient private val sqlContext: SQLContext)
- extends LeafNode with MultiInstanceRelation with FileRelation {
+ databaseName: String,
+ tableName: String,
+ alias: Option[String])
+ (val catalogTable: CatalogTable,
+ @transient private val client: HiveClient,
+ @transient private val sqlContext: SQLContext)
+ extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
override def equals(other: Any): Boolean = other match {
case relation: MetastoreRelation =>
@@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation(
Objects.hashCode(databaseName, tableName, alias, output)
}
- override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
+ override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
new FieldSchema(c.name, c.dataType, c.comment.orNull)
@@ -69,14 +69,14 @@ private[hive] case class MetastoreRelation(
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(table.identifier.table)
- tTable.setDbName(table.database)
+ tTable.setTableName(catalogTable.identifier.table)
+ tTable.setDbName(catalogTable.database)
val tableParameters = new java.util.HashMap[String, String]()
tTable.setParameters(tableParameters)
- table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
+ catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
- tTable.setTableType(table.tableType match {
+ tTable.setTableType(catalogTable.tableType match {
case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
@@ -87,22 +87,22 @@ private[hive] case class MetastoreRelation(
tTable.setSd(sd)
// Note: In Hive the schema and partition columns must be disjoint sets
- val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
- table.partitionColumnNames.contains(c.getName)
+ val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
+ catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)
tTable.setPartitionKeys(partCols.asJava)
- table.storage.locationUri.foreach(sd.setLocation)
- table.storage.inputFormat.foreach(sd.setInputFormat)
- table.storage.outputFormat.foreach(sd.setOutputFormat)
+ catalogTable.storage.locationUri.foreach(sd.setLocation)
+ catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
+ catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- table.storage.serde.foreach(serdeInfo.setSerializationLib)
+ catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
sd.setSerdeInfo(serdeInfo)
val serdeParameters = new java.util.HashMap[String, String]()
- table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
new HiveTable(tTable)
@@ -130,11 +130,11 @@ private[hive] case class MetastoreRelation(
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
- private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
+ private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
- client.getPartitionsByFilter(table, predicates)
+ client.getPartitionsByFilter(catalogTable, predicates)
} else {
allPartitions
}
@@ -147,7 +147,7 @@ private[hive] case class MetastoreRelation(
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
- sd.setCols(table.schema.map(toHiveColumn).asJava)
+ sd.setCols(catalogTable.schema.map(toHiveColumn).asJava)
p.storage.locationUri.foreach(sd.setLocation)
p.storage.inputFormat.foreach(sd.setInputFormat)
p.storage.outputFormat.foreach(sd.setOutputFormat)
@@ -158,7 +158,7 @@ private[hive] case class MetastoreRelation(
p.storage.serde.foreach(serdeInfo.setSerializationLib)
val serdeParameters = new java.util.HashMap[String, String]()
- table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
@@ -195,12 +195,12 @@ private[hive] case class MetastoreRelation(
}
/** PartitionKey attributes */
- val partitionKeys = table.partitionColumns.map(_.toAttribute)
+ val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
/** Non-partitionKey attributes */
// TODO: just make this hold the schema itself, not just non-partition columns
- val attributes = table.schema
- .filter { c => !table.partitionColumnNames.contains(c.name) }
+ val attributes = catalogTable.schema
+ .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
.map(_.toAttribute)
val output = attributes ++ partitionKeys
@@ -213,19 +213,19 @@ private[hive] case class MetastoreRelation(
override def inputFiles: Array[String] = {
val partLocations = client
- .getPartitionsByFilter(table, Nil)
+ .getPartitionsByFilter(catalogTable, Nil)
.flatMap(_.storage.locationUri)
.toArray
if (partLocations.nonEmpty) {
partLocations
} else {
Array(
- table.storage.locationUri.getOrElse(
- sys.error(s"Could not get the location of ${table.qualifiedName}.")))
+ catalogTable.storage.locationUri.getOrElse(
+ sys.error(s"Could not get the location of ${catalogTable.qualifiedName}.")))
}
}
override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
+ MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
deleted file mode 100644
index 35530b9814..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.catalyst.parser._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.HiveNativeCommand
-import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
-
-/**
- * Concrete parser for HiveQl statements.
- */
-class HiveSqlParser(conf: SQLConf) extends AbstractSqlParser {
-
- val astBuilder = new HiveSqlAstBuilder(conf)
-
- private val substitutor = new VariableSubstitution(conf)
-
- protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
- super.parse(substitutor.substitute(command))(toResult)
- }
-
- protected override def nativeCommand(sqlText: String): LogicalPlan = {
- HiveNativeCommand(substitutor.substitute(sqlText))
- }
-}
-
-/**
- * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
- */
-class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
-
- import ParserUtils._
-
- /**
- * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
- * options are passed on to Hive) e.g.:
- * {{{
- * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
- * }}}
- */
- override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.partitionSpec == null &&
- ctx.identifier != null &&
- ctx.identifier.getText.toLowerCase == "noscan") {
- AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
- } else {
- // Always just run the no scan analyze. We should fix this and implement full analyze
- // command in the future.
- AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
- }
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 7e9669af8b..6899f46eec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,109 +17,19 @@
package org.apache.spark.sql.hive.execution
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation}
+import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-/**
- * Analyzes the given table in the current database to generate statistics, which will be
- * used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
- */
-private[hive]
-case class AnalyzeTable(tableName: String) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
- val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
-
- relation match {
- case relation: MetastoreRelation =>
- val catalogTable: CatalogTable = relation.table
- // This method is mainly based on
- // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
- // in Hive 0.13 (except that we do not use fs.getContentSummary).
- // TODO: Generalize statistics collection.
- // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
- // Can we use fs.getContentSummary in future?
- // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
- // countFileSize to count the table size.
- val stagingDir = sessionState.metadataHive.getConf(
- HiveConf.ConfVars.STAGINGDIR.varname,
- HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
-
- def calculateTableSize(fs: FileSystem, path: Path): Long = {
- val fileStatus = fs.getFileStatus(path)
- val size = if (fileStatus.isDirectory) {
- fs.listStatus(path)
- .map { status =>
- if (!status.getPath().getName().startsWith(stagingDir)) {
- calculateTableSize(fs, status.getPath)
- } else {
- 0L
- }
- }
- .sum
- } else {
- fileStatus.getLen
- }
-
- size
- }
-
- val tableParameters = catalogTable.properties
- val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
- val newTotalSize =
- catalogTable.storage.locationUri.map { p =>
- val path = new Path(p)
- try {
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- calculateTableSize(fs, path)
- } catch {
- case NonFatal(e) =>
- logWarning(
- s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
- s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
- 0L
- }
- }.getOrElse(0L)
-
- // Update the Hive metastore if the total size of the table is different than the size
- // recorded in the Hive metastore.
- // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.alterTable(
- catalogTable.copy(
- properties = relation.table.properties +
- (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
- }
-
- case otherRelation =>
- throw new UnsupportedOperationException(
- s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
- }
- Seq.empty[Row]
- }
-}
-
private[hive]
case class CreateMetastoreDataSource(
tableIdent: TableIdentifier,