aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-16 15:35:51 -0700
committerYin Huai <yhuai@databricks.com>2016-04-16 15:35:51 -0700
commit3394b12c379fe0a423d73dc6316aadca18cd2110 (patch)
tree56303b4648e483314028d44c9f1bd230c4aafb5a /sql/hive
parent5cefecc95a5b8418713516802c416cfde5a94a2d (diff)
downloadspark-3394b12c379fe0a423d73dc6316aadca18cd2110.tar.gz
spark-3394b12c379fe0a423d73dc6316aadca18cd2110.tar.bz2
spark-3394b12c379fe0a423d73dc6316aadca18cd2110.zip
[SPARK-14672][SQL] Move HiveContext analyze logic to AnalyzeTable
## What changes were proposed in this pull request? Move the implementation of `hiveContext.analyze` to the command of `AnalyzeTable`. ## How was this patch tested? Existing tests. Closes #12429 Author: Yin Huai <yhuai@databricks.com> Author: Andrew Or <andrew@databricks.com> Closes #12448 from yhuai/analyzeTable.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala78
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala81
2 files changed, 81 insertions, 78 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e366743118..71ef99a6a9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -29,12 +29,9 @@ import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
@@ -45,13 +42,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
@@ -192,77 +188,7 @@ class HiveContext private[hive](
* @since 1.2.0
*/
def analyze(tableName: String) {
- val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
-
- relation match {
- case relation: MetastoreRelation =>
- // This method is mainly based on
- // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
- // in Hive 0.13 (except that we do not use fs.getContentSummary).
- // TODO: Generalize statistics collection.
- // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
- // Can we use fs.getContentSummary in future?
- // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
- // countFileSize to count the table size.
- val stagingDir = metadataHive.getConf(HiveConf.ConfVars.STAGINGDIR.varname,
- HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
-
- def calculateTableSize(fs: FileSystem, path: Path): Long = {
- val fileStatus = fs.getFileStatus(path)
- val size = if (fileStatus.isDirectory) {
- fs.listStatus(path)
- .map { status =>
- if (!status.getPath().getName().startsWith(stagingDir)) {
- calculateTableSize(fs, status.getPath)
- } else {
- 0L
- }
- }
- .sum
- } else {
- fileStatus.getLen
- }
-
- size
- }
-
- def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
- val path = table.getPath
- var size: Long = 0L
- try {
- val fs = path.getFileSystem(conf)
- size = calculateTableSize(fs, path)
- } catch {
- case e: Exception =>
- logWarning(
- s"Failed to get the size of table ${table.getTableName} in the " +
- s"database ${table.getDbName} because of ${e.toString}", e)
- size = 0L
- }
-
- size
- }
-
- val tableParameters = relation.hiveQlTable.getParameters
- val oldTotalSize =
- Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
- .map(_.toLong)
- .getOrElse(0L)
- val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
- // Update the Hive metastore if the total size of the table is different than the size
- // recorded in the Hive metastore.
- // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.alterTable(
- relation.table.copy(
- properties = relation.table.properties +
- (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
- }
- case otherRelation =>
- throw new UnsupportedOperationException(
- s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
- }
+ AnalyzeTable(tableName).run(self)
}
override def setConf(key: String, value: String): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 06badff474..0c06608ff9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,7 +17,11 @@
package org.apache.spark.sql.hive.execution
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
+import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -26,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, LogicalRelation}
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -41,7 +45,80 @@ private[hive]
case class AnalyzeTable(tableName: String) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.asInstanceOf[HiveContext].analyze(tableName)
+ val sessionState = sqlContext.sessionState
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+ val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
+ val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+ relation match {
+ case relation: MetastoreRelation =>
+ // This method is mainly based on
+ // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+ // in Hive 0.13 (except that we do not use fs.getContentSummary).
+ // TODO: Generalize statistics collection.
+ // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+ // Can we use fs.getContentSummary in future?
+ // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+ // countFileSize to count the table size.
+ val stagingDir = hiveContext.metadataHive.getConf(
+ HiveConf.ConfVars.STAGINGDIR.varname,
+ HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
+
+ def calculateTableSize(fs: FileSystem, path: Path): Long = {
+ val fileStatus = fs.getFileStatus(path)
+ val size = if (fileStatus.isDirectory) {
+ fs.listStatus(path)
+ .map { status =>
+ if (!status.getPath().getName().startsWith(stagingDir)) {
+ calculateTableSize(fs, status.getPath)
+ } else {
+ 0L
+ }
+ }
+ .sum
+ } else {
+ fileStatus.getLen
+ }
+
+ size
+ }
+
+ def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
+ val path = table.getPath
+ var size: Long = 0L
+ try {
+ val fs = path.getFileSystem(conf)
+ size = calculateTableSize(fs, path)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to get the size of table ${table.getTableName} in the " +
+ s"database ${table.getDbName} because of ${e.toString}", e)
+ size = 0L
+ }
+
+ size
+ }
+
+ val tableParameters = relation.hiveQlTable.getParameters
+ val oldTotalSize =
+ Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
+ .map(_.toLong)
+ .getOrElse(0L)
+ val newTotalSize = getFileSizeForTable(hiveContext.hiveconf, relation.hiveQlTable)
+ // Update the Hive metastore if the total size of the table is different than the size
+ // recorded in the Hive metastore.
+ // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ sessionState.catalog.alterTable(
+ relation.table.copy(
+ properties = relation.table.properties +
+ (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
+ }
+ case otherRelation =>
+ throw new UnsupportedOperationException(
+ s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
+ }
Seq.empty[Row]
}
}