diff options
author | Parth Brahmbhatt <pbrahmbhatt@netflix.com> | 2016-05-24 20:58:20 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-24 20:58:20 -0700 |
commit | 4acababcaba567c85f3be0d5e939d99119b82d1d (patch) | |
tree | 2399ad66af26dcee58de8ba0c4c8ea18fefd07d2 /sql/hive/src/main | |
parent | 14494da87bdf057d2d2f796b962a4d8bc4747d31 (diff) | |
download | spark-4acababcaba567c85f3be0d5e939d99119b82d1d.tar.gz spark-4acababcaba567c85f3be0d5e939d99119b82d1d.tar.bz2 spark-4acababcaba567c85f3be0d5e939d99119b82d1d.zip |
[SPARK-15365][SQL] When table size statistics are not available from metastore, we should fallback to HDFS
## What changes were proposed in this pull request?
Currently if a table is used in join operation we rely on Metastore returned size to calculate if we can convert the operation to Broadcast join. This optimization only kicks in for table's that have the statistics available in metastore. Hive generally rolls over to HDFS if the statistics are not available directly from metastore and this seems like a reasonable choice to adopt given the optimization benefit of using broadcast joins.
## How was this patch tested?
I have executed queries locally to test.
Author: Parth Brahmbhatt <pbrahmbhatt@netflix.com>
Closes #13150 from Parth-Brahmbhatt/SPARK-15365.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 33 |
1 files changed, 25 insertions, 8 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 1671228fd9..9c820144ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.hive +import java.io.IOException + import scala.collection.JavaConverters._ import com.google.common.base.Objects +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema @@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation( val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) // TODO: check if this estimate is valid for tables after partition pruning. // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. An - // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot - // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, - // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. BigInt( // When table is external,`totalSize` is always zero, which will influence join strategy // so when `totalSize` is zero, use `rawDataSize` instead - // if the size is still less than zero, we use default size - Option(totalSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes))) + // if the size is still less than zero, we try to get the file size from HDFS. + // given this is only needed for optimization, if the HDFS call fails we return the default. + if (totalSize != null && totalSize.toLong > 0L) { + totalSize.toLong + } else if (rawDataSize != null && rawDataSize.toLong > 0) { + rawDataSize.toLong + } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + fs.getContentSummary(hiveQlTable.getPath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + } else { + sparkSession.sessionState.conf.defaultSizeInBytes + }) } ) |