aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 17:41:29 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 17:41:29 -0700
commitf181aee07c0ee105b2a34581105eeeada7d42363 (patch)
treed3dd7f74e0aa2ad9881517ce8c79052bf0863de0
parent4e726227a3e68c776ea30b78b7db8d01d00b44d6 (diff)
downloadspark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.gz
spark-f181aee07c0ee105b2a34581105eeeada7d42363.tar.bz2
spark-f181aee07c0ee105b2a34581105eeeada7d42363.zip
[SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder
## What changes were proposed in this pull request? This patch moves analyze table parsing into SparkSqlAstBuilder and removes HiveSqlAstBuilder. In order to avoid extensive refactoring, I created a common trait for CatalogRelation and MetastoreRelation, and match on that. In the future we should probably just consolidate the two into a single thing so we don't need this common trait. ## How was this patch tested? Updated unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #12584 from rxin/SPARK-14821.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala22
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala111
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala62
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala92
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala2
13 files changed, 199 insertions, 226 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 34e1cb7315..ab5124ea56 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -266,7 +266,7 @@ class SessionCatalog(
val relation =
if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
- CatalogRelation(db, metadata, alias)
+ SimpleCatalogRelation(db, metadata, alias)
} else {
tempTables(table)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index ad989a97e4..d2294efd9a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -295,17 +295,31 @@ object ExternalCatalog {
/**
+ * An interface that is implemented by logical plans to return the underlying catalog table.
+ * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should
+ * probably remove this interface.
+ */
+trait CatalogRelation {
+ def catalogTable: CatalogTable
+}
+
+
+/**
* A [[LogicalPlan]] that wraps [[CatalogTable]].
+ *
+ * Note that in the future we should consolidate this and HiveCatalogRelation.
*/
-case class CatalogRelation(
- db: String,
+case class SimpleCatalogRelation(
+ databaseName: String,
metadata: CatalogTable,
alias: Option[String] = None)
- extends LeafNode {
+ extends LeafNode with CatalogRelation {
// TODO: implement this
override def output: Seq[Attribute] = Seq.empty
- require(metadata.identifier.database == Some(db),
+ override def catalogTable: CatalogTable = metadata
+
+ require(metadata.identifier.database == Some(databaseName),
"provided database does not match the one specified in the table definition")
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 426273e1e3..27205c4587 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -372,25 +372,25 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
- == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+ == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
- == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+ == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
}
test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
- val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
+ val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata))
val relationWithAlias =
SubqueryAlias(alias,
SubqueryAlias("tbl1",
- CatalogRelation("db2", tableMetadata, Some(alias))))
+ SimpleCatalogRelation("db2", tableMetadata, Some(alias))))
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
assert(catalog.lookupRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 05fb1ef631..cae6430693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -87,6 +87,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
+ * options are passed on to Hive) e.g.:
+ * {{{
+ * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+ * }}}
+ */
+ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
+ if (ctx.partitionSpec == null &&
+ ctx.identifier != null &&
+ ctx.identifier.getText.toLowerCase == "noscan") {
+ AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+ } else {
+ // Always just run the no scan analyze. We should fix this and implement full analyze
+ // command in the future.
+ AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+ }
+ }
+
+ /**
* Create a [[SetDatabaseCommand]] logical plan.
*/
override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
new file mode 100644
index 0000000000..7fa246ba51
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+
+
+/**
+ * Analyzes the given table in the current database to generate statistics, which will be
+ * used in query optimizations.
+ *
+ * Right now, it only supports Hive tables and it only updates the size of a Hive table
+ * in the Hive metastore.
+ */
+case class AnalyzeTable(tableName: String) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val sessionState = sqlContext.sessionState
+ val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
+ val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+ relation match {
+ case relation: CatalogRelation =>
+ val catalogTable: CatalogTable = relation.catalogTable
+ // This method is mainly based on
+ // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+ // in Hive 0.13 (except that we do not use fs.getContentSummary).
+ // TODO: Generalize statistics collection.
+ // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+ // Can we use fs.getContentSummary in future?
+ // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+ // countFileSize to count the table size.
+ val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
+ def calculateTableSize(fs: FileSystem, path: Path): Long = {
+ val fileStatus = fs.getFileStatus(path)
+ val size = if (fileStatus.isDirectory) {
+ fs.listStatus(path)
+ .map { status =>
+ if (!status.getPath.getName.startsWith(stagingDir)) {
+ calculateTableSize(fs, status.getPath)
+ } else {
+ 0L
+ }
+ }.sum
+ } else {
+ fileStatus.getLen
+ }
+
+ size
+ }
+
+ val tableParameters = catalogTable.properties
+ val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
+ val newTotalSize =
+ catalogTable.storage.locationUri.map { p =>
+ val path = new Path(p)
+ try {
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ calculateTableSize(fs, path)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(
+ s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
+ s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
+ 0L
+ }
+ }.getOrElse(0L)
+
+ // Update the Hive metastore if the total size of the table is different than the size
+ // recorded in the Hive metastore.
+ // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ sessionState.catalog.alterTable(
+ catalogTable.copy(
+ properties = relation.catalogTable.properties +
+ (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
+ }
+
+ case otherRelation =>
+ throw new UnsupportedOperationException(
+ s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
+ }
+ Seq.empty[Row]
+ }
+}
+
+object AnalyzeTable {
+ val TOTAL_SIZE_FIELD = "totalSize"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index e1be4b882f..c423b84957 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.AnalyzeTable
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -162,8 +163,15 @@ private[sql] class SessionState(ctx: SQLContext) {
ctx.sparkContext.addJar(path)
}
+ /**
+ * Analyzes the given table in the current database to generate statistics, which will be
+ * used in query optimizations.
+ *
+ * Right now, it only supports catalog tables and it only updates the size of a catalog table
+ * in the external catalog.
+ */
def analyze(tableName: String): Unit = {
- throw new UnsupportedOperationException
+ AnalyzeTable(tableName).run(ctx)
}
def runNativeSql(sql: String): Seq[String] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ca397910c6..df2b6beac6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -392,7 +392,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
- val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq
+ val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
val cachedRelationFileFormatClass = relation.fileFormat.getClass
expectedFileFormat match {
@@ -467,7 +467,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
Some(partitionSpec))
val hadoopFsRelation = cached.getOrElse {
- val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
+ val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
val inferredSchema = if (fileType.equals("parquet")) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6f4332c65f..4db0d78cfc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -24,12 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
-import org.apache.spark.sql.hive.execution.{AnalyzeTable, HiveSqlParser}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
@@ -106,11 +104,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
}
/**
- * Parser for HiveQl query texts.
- */
- override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf)
-
- /**
* Planner that takes into account Hive-specific strategies.
*/
override def planner: SparkPlanner = {
@@ -175,17 +168,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
}
/**
- * Analyzes the given table in the current database to generate statistics, which will be
- * used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
- */
- override def analyze(tableName: String): Unit = {
- AnalyzeTable(tableName).run(ctx)
- }
-
- /**
* Execute a SQL statement by passing the query text directly to Hive.
*/
override def runNativeSql(sql: String): Seq[String] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index a66c325b8f..cd45706841 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -37,13 +37,13 @@ import org.apache.spark.sql.hive.client.HiveClient
private[hive] case class MetastoreRelation(
- databaseName: String,
- tableName: String,
- alias: Option[String])
- (val table: CatalogTable,
- @transient private val client: HiveClient,
- @transient private val sqlContext: SQLContext)
- extends LeafNode with MultiInstanceRelation with FileRelation {
+ databaseName: String,
+ tableName: String,
+ alias: Option[String])
+ (val catalogTable: CatalogTable,
+ @transient private val client: HiveClient,
+ @transient private val sqlContext: SQLContext)
+ extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
override def equals(other: Any): Boolean = other match {
case relation: MetastoreRelation =>
@@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation(
Objects.hashCode(databaseName, tableName, alias, output)
}
- override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
+ override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
new FieldSchema(c.name, c.dataType, c.comment.orNull)
@@ -69,14 +69,14 @@ private[hive] case class MetastoreRelation(
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(table.identifier.table)
- tTable.setDbName(table.database)
+ tTable.setTableName(catalogTable.identifier.table)
+ tTable.setDbName(catalogTable.database)
val tableParameters = new java.util.HashMap[String, String]()
tTable.setParameters(tableParameters)
- table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
+ catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
- tTable.setTableType(table.tableType match {
+ tTable.setTableType(catalogTable.tableType match {
case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
@@ -87,22 +87,22 @@ private[hive] case class MetastoreRelation(
tTable.setSd(sd)
// Note: In Hive the schema and partition columns must be disjoint sets
- val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
- table.partitionColumnNames.contains(c.getName)
+ val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
+ catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)
tTable.setPartitionKeys(partCols.asJava)
- table.storage.locationUri.foreach(sd.setLocation)
- table.storage.inputFormat.foreach(sd.setInputFormat)
- table.storage.outputFormat.foreach(sd.setOutputFormat)
+ catalogTable.storage.locationUri.foreach(sd.setLocation)
+ catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
+ catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- table.storage.serde.foreach(serdeInfo.setSerializationLib)
+ catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
sd.setSerdeInfo(serdeInfo)
val serdeParameters = new java.util.HashMap[String, String]()
- table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
new HiveTable(tTable)
@@ -130,11 +130,11 @@ private[hive] case class MetastoreRelation(
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
- private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
+ private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
- client.getPartitionsByFilter(table, predicates)
+ client.getPartitionsByFilter(catalogTable, predicates)
} else {
allPartitions
}
@@ -147,7 +147,7 @@ private[hive] case class MetastoreRelation(
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
- sd.setCols(table.schema.map(toHiveColumn).asJava)
+ sd.setCols(catalogTable.schema.map(toHiveColumn).asJava)
p.storage.locationUri.foreach(sd.setLocation)
p.storage.inputFormat.foreach(sd.setInputFormat)
p.storage.outputFormat.foreach(sd.setOutputFormat)
@@ -158,7 +158,7 @@ private[hive] case class MetastoreRelation(
p.storage.serde.foreach(serdeInfo.setSerializationLib)
val serdeParameters = new java.util.HashMap[String, String]()
- table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
@@ -195,12 +195,12 @@ private[hive] case class MetastoreRelation(
}
/** PartitionKey attributes */
- val partitionKeys = table.partitionColumns.map(_.toAttribute)
+ val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
/** Non-partitionKey attributes */
// TODO: just make this hold the schema itself, not just non-partition columns
- val attributes = table.schema
- .filter { c => !table.partitionColumnNames.contains(c.name) }
+ val attributes = catalogTable.schema
+ .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
.map(_.toAttribute)
val output = attributes ++ partitionKeys
@@ -213,19 +213,19 @@ private[hive] case class MetastoreRelation(
override def inputFiles: Array[String] = {
val partLocations = client
- .getPartitionsByFilter(table, Nil)
+ .getPartitionsByFilter(catalogTable, Nil)
.flatMap(_.storage.locationUri)
.toArray
if (partLocations.nonEmpty) {
partLocations
} else {
Array(
- table.storage.locationUri.getOrElse(
- sys.error(s"Could not get the location of ${table.qualifiedName}.")))
+ catalogTable.storage.locationUri.getOrElse(
+ sys.error(s"Could not get the location of ${catalogTable.qualifiedName}.")))
}
}
override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
+ MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
deleted file mode 100644
index 35530b9814..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.catalyst.parser._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.HiveNativeCommand
-import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
-
-/**
- * Concrete parser for HiveQl statements.
- */
-class HiveSqlParser(conf: SQLConf) extends AbstractSqlParser {
-
- val astBuilder = new HiveSqlAstBuilder(conf)
-
- private val substitutor = new VariableSubstitution(conf)
-
- protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
- super.parse(substitutor.substitute(command))(toResult)
- }
-
- protected override def nativeCommand(sqlText: String): LogicalPlan = {
- HiveNativeCommand(substitutor.substitute(sqlText))
- }
-}
-
-/**
- * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
- */
-class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
-
- import ParserUtils._
-
- /**
- * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
- * options are passed on to Hive) e.g.:
- * {{{
- * ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
- * }}}
- */
- override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.partitionSpec == null &&
- ctx.identifier != null &&
- ctx.identifier.getText.toLowerCase == "noscan") {
- AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
- } else {
- // Always just run the no scan analyze. We should fix this and implement full analyze
- // command in the future.
- AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
- }
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 7e9669af8b..6899f46eec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,109 +17,19 @@
package org.apache.spark.sql.hive.execution
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation}
+import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-/**
- * Analyzes the given table in the current database to generate statistics, which will be
- * used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
- */
-private[hive]
-case class AnalyzeTable(tableName: String) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
- val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
-
- relation match {
- case relation: MetastoreRelation =>
- val catalogTable: CatalogTable = relation.table
- // This method is mainly based on
- // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
- // in Hive 0.13 (except that we do not use fs.getContentSummary).
- // TODO: Generalize statistics collection.
- // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
- // Can we use fs.getContentSummary in future?
- // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
- // countFileSize to count the table size.
- val stagingDir = sessionState.metadataHive.getConf(
- HiveConf.ConfVars.STAGINGDIR.varname,
- HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
-
- def calculateTableSize(fs: FileSystem, path: Path): Long = {
- val fileStatus = fs.getFileStatus(path)
- val size = if (fileStatus.isDirectory) {
- fs.listStatus(path)
- .map { status =>
- if (!status.getPath().getName().startsWith(stagingDir)) {
- calculateTableSize(fs, status.getPath)
- } else {
- 0L
- }
- }
- .sum
- } else {
- fileStatus.getLen
- }
-
- size
- }
-
- val tableParameters = catalogTable.properties
- val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
- val newTotalSize =
- catalogTable.storage.locationUri.map { p =>
- val path = new Path(p)
- try {
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- calculateTableSize(fs, path)
- } catch {
- case NonFatal(e) =>
- logWarning(
- s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
- s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
- 0L
- }
- }.getOrElse(0L)
-
- // Update the Hive metastore if the total size of the table is different than the size
- // recorded in the Hive metastore.
- // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.alterTable(
- catalogTable.copy(
- properties = relation.table.properties +
- (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
- }
-
- case otherRelation =>
- throw new UnsupportedOperationException(
- s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
- }
- Seq.empty[Row]
- }
-}
-
private[hive]
case class CreateMetastoreDataSource(
tableIdent: TableIdentifier,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 7a6f1ce0d1..565b310bb7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,9 +21,8 @@ import scala.reflect.ClassTag
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.HiveNativeCommand
+import org.apache.spark.sql.execution.command.AnalyzeTable
import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -117,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
// Try to analyze a temp table
sql("""SELECT * FROM src""").registerTempTable("tempTable")
intercept[UnsupportedOperationException] {
- hiveContext.sessionState.analyze("tempTable")
+ hiveContext.sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
}
hiveContext.sessionState.catalog.dropTable(
TableIdentifier("tempTable"), ignoreIfNotExists = true)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 79ac53c863..12f30e2e74 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -153,7 +153,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
val (actualScannedColumns, actualPartValues) = plan.collect {
case p @ HiveTableScan(columns, relation, _) =>
val columnNames = columns.map(_.name)
- val partValues = if (relation.table.partitionColumnNames.nonEmpty) {
+ val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) {
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
} else {
Seq.empty