aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-08-03 14:54:41 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-03 14:54:41 -0700
commite139e2be60ef23281327744e1b3e74904dfdf63f (patch)
tree1a2dc996b643a9e50d5084828ac1d01ff72e622c /sql
parentac33cbbf33bd1ab29bc8165c9be02fb8934b1fdf (diff)
downloadspark-e139e2be60ef23281327744e1b3e74904dfdf63f.tar.gz
spark-e139e2be60ef23281327744e1b3e74904dfdf63f.tar.bz2
spark-e139e2be60ef23281327744e1b3e74904dfdf63f.zip
[SPARK-2783][SQL] Basic support for analyze in HiveContext
JIRA: https://issues.apache.org/jira/browse/SPARK-2783 Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1741 from yhuai/analyzeTable and squashes the following commits: 7bb5f02 [Yin Huai] Use sql instead of hql. 4d09325 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable e3ebcd4 [Yin Huai] Renaming. c170f4e [Yin Huai] Do not use getContentSummary. 62393b6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable db233a6 [Yin Huai] Trying to debug jenkins... fee84f0 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable f0501f3 [Yin Huai] Fix compilation error. 24ad391 [Yin Huai] Merge remote-tracking branch 'upstream/master' into analyzeTable 8918140 [Yin Huai] Wording. 23df227 [Yin Huai] Add a simple analyze method to get the size of a table and update the "totalSize" property of this table in the Hive metastore.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala79
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala54
3 files changed, 136 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index acad681f68..d8e7a5943d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -25,10 +25,14 @@ import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.io.TimestampWritable
import org.apache.spark.SparkContext
@@ -107,6 +111,81 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}
+ /**
+ * Analyzes the given table in the current database to generate statistics, which will be
+ * used in query optimizations.
+ *
+ * Right now, it only supports Hive tables and it only updates the size of a Hive table
+ * in the Hive metastore.
+ */
+ def analyze(tableName: String) {
+ val relation = catalog.lookupRelation(None, tableName) match {
+ case LowerCaseSchema(r) => r
+ case o => o
+ }
+
+ relation match {
+ case relation: MetastoreRelation => {
+ // This method is mainly based on
+ // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+ // in Hive 0.13 (except that we do not use fs.getContentSummary).
+ // TODO: Generalize statistics collection.
+ // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+ // Can we use fs.getContentSummary in future?
+ // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+ // countFileSize to count the table size.
+ def calculateTableSize(fs: FileSystem, path: Path): Long = {
+ val fileStatus = fs.getFileStatus(path)
+ val size = if (fileStatus.isDir) {
+ fs.listStatus(path).map(status => calculateTableSize(fs, status.getPath)).sum
+ } else {
+ fileStatus.getLen
+ }
+
+ size
+ }
+
+ def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
+ val path = table.getPath()
+ var size: Long = 0L
+ try {
+ val fs = path.getFileSystem(conf)
+ size = calculateTableSize(fs, path)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to get the size of table ${table.getTableName} in the " +
+ s"database ${table.getDbName} because of ${e.toString}", e)
+ size = 0L
+ }
+
+ size
+ }
+
+ val tableParameters = relation.hiveQlTable.getParameters
+ val oldTotalSize =
+ Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L)
+ val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
+ // Update the Hive metastore if the total size of the table is different than the size
+ // recorded in the Hive metastore.
+ // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+ if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString)
+ val hiveTTable = relation.hiveQlTable.getTTable
+ hiveTTable.setParameters(tableParameters)
+ val tableFullName =
+ relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName()
+
+ catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+ }
+ }
+ case otherRelation =>
+ throw new NotImplementedError(
+ s"Analyze has only implemented for Hive tables, " +
+ s"but ${tableName} is a ${otherRelation.nodeName}")
+ }
+ }
+
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
@transient
protected lazy val outputBuffer = new java.io.OutputStream {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index df3604439e..301cf51c00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, Ser
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.spark.annotation.DeveloperApi
@@ -278,9 +279,9 @@ private[hive] case class MetastoreRelation
// relatively cheap if parameters for the table are populated into the metastore. An
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
- // `rawDataSize` keys that we can look at in the future.
+ // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
- Option(hiveQlTable.getParameters.get("totalSize"))
+ Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE))
.map(_.toLong)
.getOrElse(sqlContext.defaultSizeInBytes))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index d8c77d6021..bf5931bbf9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,60 @@ import org.apache.spark.sql.hive.test.TestHive._
class StatisticsSuite extends QueryTest {
+ test("analyze MetastoreRelations") {
+ def queryTotalSize(tableName: String): BigInt =
+ catalog.lookupRelation(None, tableName).statistics.sizeInBytes
+
+ // Non-partitioned table
+ sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
+ sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+ sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+
+ assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
+
+ analyze("analyzeTable")
+
+ assert(queryTotalSize("analyzeTable") === BigInt(11624))
+
+ sql("DROP TABLE analyzeTable").collect()
+
+ // Partitioned table
+ sql(
+ """
+ |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED BY (ds STRING)
+ """.stripMargin).collect()
+ sql(
+ """
+ |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01')
+ |SELECT * FROM src
+ """.stripMargin).collect()
+ sql(
+ """
+ |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02')
+ |SELECT * FROM src
+ """.stripMargin).collect()
+ sql(
+ """
+ |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03')
+ |SELECT * FROM src
+ """.stripMargin).collect()
+
+ assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)
+
+ analyze("analyzeTable_part")
+
+ assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
+
+ sql("DROP TABLE analyzeTable_part").collect()
+
+ // Try to analyze a temp table
+ sql("""SELECT * FROM src""").registerTempTable("tempTable")
+ intercept[NotImplementedError] {
+ analyze("tempTable")
+ }
+ catalog.unregisterTable(None, "tempTable")
+ }
+
test("estimates the size of a test MetastoreRelation") {
val rdd = sql("""SELECT * FROM src""")
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>