aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
blob: e6c5351106c478831e3787cd8fed05c1d3aaf73d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}


/**
 * Analyzes the given table in the current database to generate statistics, which will be
 * used in query optimizations.
 *
 * Right now, it only supports Hive tables and it only updates the size of a Hive table
 * in the Hive metastore.
 */
case class AnalyzeTable(tableName: String) extends RunnableCommand {

  override def run(sqlContext: SQLContext): Seq[Row] = {
    val sessionState = sqlContext.sessionState
    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))

    relation match {
      case relation: CatalogRelation =>
        val catalogTable: CatalogTable = relation.catalogTable
        // This method is mainly based on
        // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
        // in Hive 0.13 (except that we do not use fs.getContentSummary).
        // TODO: Generalize statistics collection.
        // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
        // Can we use fs.getContentSummary in future?
        // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
        // countFileSize to count the table size.
        val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

        def calculateTableSize(fs: FileSystem, path: Path): Long = {
          val fileStatus = fs.getFileStatus(path)
          val size = if (fileStatus.isDirectory) {
            fs.listStatus(path)
              .map { status =>
                if (!status.getPath.getName.startsWith(stagingDir)) {
                  calculateTableSize(fs, status.getPath)
                } else {
                  0L
                }
              }.sum
          } else {
            fileStatus.getLen
          }

          size
        }

        val tableParameters = catalogTable.properties
        val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
        val newTotalSize =
          catalogTable.storage.locationUri.map { p =>
            val path = new Path(p)
            try {
              val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
              calculateTableSize(fs, path)
            } catch {
              case NonFatal(e) =>
                logWarning(
                  s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
                    s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
                0L
            }
          }.getOrElse(0L)

        // Update the Hive metastore if the total size of the table is different than the size
        // recorded in the Hive metastore.
        // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
        if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
          sessionState.catalog.alterTable(
            catalogTable.copy(
              properties = relation.catalogTable.properties +
                (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
        }

      case otherRelation =>
        throw new UnsupportedOperationException(
          s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
    }
    Seq.empty[Row]
  }
}

object AnalyzeTable {
  val TOTAL_SIZE_FIELD = "totalSize"
}