From 42389b1780311d90499b4ce2315ceabf5b6ab384 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 17 Nov 2014 16:33:50 -0800 Subject: [SPARK-4443][SQL] Fix statistics for external table in spark sql hive The `totalSize` of external table is always zero, which will influence join strategy(always use broadcast join for external table). Author: w00228970 Closes #3304 from scwf/statistics and squashes the following commits: 568f321 [w00228970] fix statistics for external table --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 11 ++++++++--- .../src/main/scala/org/apache/spark/sql/hive/Shim12.scala | 2 ++ .../src/main/scala/org/apache/spark/sql/hive/Shim13.scala | 2 ++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9045fc8558..91a157785d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -447,6 +447,8 @@ private[hive] case class MetastoreRelation @transient override lazy val statistics = Statistics( sizeInBytes = { + val totalSize = hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize) + val rawDataSize = hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstRawDataSize) // TODO: check if this estimate is valid for tables after partition pruning. // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be // relatively cheap if parameters for the table are populated into the metastore. An @@ -454,9 +456,12 @@ private[hive] case class MetastoreRelation // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. BigInt( - Option(hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize)) - .map(_.toLong) - .getOrElse(sqlContext.defaultSizeInBytes)) + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead + // if the size is still less than zero, we use default size + Option(totalSize).map(_.toLong).filter(_ > 0) + .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) + .getOrElse(sqlContext.defaultSizeInBytes))) } ) diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala index 8ba25f889d..76f09cbcde 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala @@ -136,6 +136,8 @@ private[hive] object HiveShim { def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE + def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE + def createDefaultDBIfNeeded(context: HiveContext) = { } def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index e4aee57f0a..91f7ceac21 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -154,6 +154,8 @@ private[hive] object HiveShim { def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE + def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE + def createDefaultDBIfNeeded(context: HiveContext) = { context.runSqlHive("CREATE DATABASE default") context.runSqlHive("USE default") -- cgit v1.2.3