From e139e2be60ef23281327744e1b3e74904dfdf63f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 3 Aug 2014 14:54:41 -0700 Subject: [SPARK-2783][SQL] Basic support for analyze in HiveContext JIRA: https://issues.apache.org/jira/browse/SPARK-2783 Author: Yin Huai Closes #1741 from yhuai/analyzeTable and squashes the following commits: 7bb5f02 [Yin Huai] Use sql instead of hql. 4d09325 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable e3ebcd4 [Yin Huai] Renaming. c170f4e [Yin Huai] Do not use getContentSummary. 62393b6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable db233a6 [Yin Huai] Trying to debug jenkins... fee84f0 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable f0501f3 [Yin Huai] Fix compilation error. 24ad391 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable 8918140 [Yin Huai] Wording. 23df227 [Yin Huai] Add a simple analyze method to get the size of a table and update the "totalSize" property of this table in the Hive metastore. --- .../org/apache/spark/sql/hive/HiveContext.scala | 79 ++++++++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 5 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 54 +++++++++++++++ 3 files changed, 136 insertions(+), 2 deletions(-) (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index acad681f68..d8e7a5943d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -25,10 +25,14 @@ import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.reflect.runtime.universe.{TypeTag, typeTag} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.io.TimestampWritable import org.apache.spark.SparkContext @@ -107,6 +111,81 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } + /** + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports Hive tables and it only updates the size of a Hive table + * in the Hive metastore. + */ + def analyze(tableName: String) { + val relation = catalog.lookupRelation(None, tableName) match { + case LowerCaseSchema(r) => r + case o => o + } + + relation match { + case relation: MetastoreRelation => { + // This method is mainly based on + // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) + // in Hive 0.13 (except that we do not use fs.getContentSummary). + // TODO: Generalize statistics collection. + // TODO: Why fs.getContentSummary returns wrong size on Jenkins? + // Can we use fs.getContentSummary in future? + // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use + // countFileSize to count the table size. + def calculateTableSize(fs: FileSystem, path: Path): Long = { + val fileStatus = fs.getFileStatus(path) + val size = if (fileStatus.isDir) { + fs.listStatus(path).map(status => calculateTableSize(fs, status.getPath)).sum + } else { + fileStatus.getLen + } + + size + } + + def getFileSizeForTable(conf: HiveConf, table: Table): Long = { + val path = table.getPath() + var size: Long = 0L + try { + val fs = path.getFileSystem(conf) + size = calculateTableSize(fs, path) + } catch { + case e: Exception => + logWarning( + s"Failed to get the size of table ${table.getTableName} in the " + + s"database ${table.getDbName} because of ${e.toString}", e) + size = 0L + } + + size + } + + val tableParameters = relation.hiveQlTable.getParameters + val oldTotalSize = + Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L) + val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable) + // Update the Hive metastore if the total size of the table is different than the size + // recorded in the Hive metastore. + // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString) + val hiveTTable = relation.hiveQlTable.getTTable + hiveTTable.setParameters(tableParameters) + val tableFullName = + relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName() + + catalog.client.alterTable(tableFullName, new Table(hiveTTable)) + } + } + case otherRelation => + throw new NotImplementedError( + s"Analyze has only implemented for Hive tables, " + + s"but ${tableName} is a ${otherRelation.nodeName}") + } + } + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. @transient protected lazy val outputBuffer = new java.io.OutputStream { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index df3604439e..301cf51c00 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, Ser import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.annotation.DeveloperApi @@ -278,9 +279,9 @@ private[hive] case class MetastoreRelation // relatively cheap if parameters for the table are populated into the metastore. An // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, - // `rawDataSize` keys that we can look at in the future. + // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. BigInt( - Option(hiveQlTable.getParameters.get("totalSize")) + Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)) .map(_.toLong) .getOrElse(sqlContext.defaultSizeInBytes)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index d8c77d6021..bf5931bbf9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -26,6 +26,60 @@ import org.apache.spark.sql.hive.test.TestHive._ class StatisticsSuite extends QueryTest { + test("analyze MetastoreRelations") { + def queryTotalSize(tableName: String): BigInt = + catalog.lookupRelation(None, tableName).statistics.sizeInBytes + + // Non-partitioned table + sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() + sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() + sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() + + assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) + + analyze("analyzeTable") + + assert(queryTotalSize("analyzeTable") === BigInt(11624)) + + sql("DROP TABLE analyzeTable").collect() + + // Partitioned table + sql( + """ + |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED BY (ds STRING) + """.stripMargin).collect() + sql( + """ + |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01') + |SELECT * FROM src + """.stripMargin).collect() + sql( + """ + |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02') + |SELECT * FROM src + """.stripMargin).collect() + sql( + """ + |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03') + |SELECT * FROM src + """.stripMargin).collect() + + assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes) + + analyze("analyzeTable_part") + + assert(queryTotalSize("analyzeTable_part") === BigInt(17436)) + + sql("DROP TABLE analyzeTable_part").collect() + + // Try to analyze a temp table + sql("""SELECT * FROM src""").registerTempTable("tempTable") + intercept[NotImplementedError] { + analyze("tempTable") + } + catalog.unregisterTable(None, "tempTable") + } + test("estimates the size of a test MetastoreRelation") { val rdd = sql("""SELECT * FROM src""") val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => -- cgit v1.2.3