aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala49
1 files changed, 23 insertions, 26 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 78f8bfe59f..7e9669af8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,16 +17,17 @@
package org.apache.spark.sql.hive.execution
+import scala.util.control.NonFatal
+
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
-import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
@@ -34,6 +35,7 @@ import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
+
/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
@@ -51,6 +53,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
relation match {
case relation: MetastoreRelation =>
+ val catalogTable: CatalogTable = relation.table
// This method is mainly based on
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
// in Hive 0.13 (except that we do not use fs.getContentSummary).
@@ -82,39 +85,33 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
size
}
- def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
- val path = table.getPath
- var size: Long = 0L
- try {
- val fs = path.getFileSystem(conf)
- size = calculateTableSize(fs, path)
- } catch {
- case e: Exception =>
- logWarning(
- s"Failed to get the size of table ${table.getTableName} in the " +
- s"database ${table.getDbName} because of ${e.toString}", e)
- size = 0L
- }
-
- size
- }
-
- val tableParameters = relation.hiveQlTable.getParameters
- val oldTotalSize =
- Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
- .map(_.toLong)
- .getOrElse(0L)
+ val tableParameters = catalogTable.properties
+ val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
val newTotalSize =
- getFileSizeForTable(sessionState.hiveconf, relation.hiveQlTable)
+ catalogTable.storage.locationUri.map { p =>
+ val path = new Path(p)
+ try {
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ calculateTableSize(fs, path)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(
+ s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
+ s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
+ 0L
+ }
+ }.getOrElse(0L)
+
// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
sessionState.catalog.alterTable(
- relation.table.copy(
+ catalogTable.copy(
properties = relation.table.properties +
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
}
+
case otherRelation =>
throw new UnsupportedOperationException(
s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")