aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorParth Brahmbhatt <pbrahmbhatt@netflix.com>2016-05-24 20:58:20 -0700
committerReynold Xin <rxin@databricks.com>2016-05-24 20:58:20 -0700
commit4acababcaba567c85f3be0d5e939d99119b82d1d (patch)
tree2399ad66af26dcee58de8ba0c4c8ea18fefd07d2 /sql/hive/src/main
parent14494da87bdf057d2d2f796b962a4d8bc4747d31 (diff)
downloadspark-4acababcaba567c85f3be0d5e939d99119b82d1d.tar.gz
spark-4acababcaba567c85f3be0d5e939d99119b82d1d.tar.bz2
spark-4acababcaba567c85f3be0d5e939d99119b82d1d.zip
[SPARK-15365][SQL] When table size statistics are not available from metastore, we should fallback to HDFS
## What changes were proposed in this pull request? Currently if a table is used in join operation we rely on Metastore returned size to calculate if we can convert the operation to Broadcast join. This optimization only kicks in for table's that have the statistics available in metastore. Hive generally rolls over to HDFS if the statistics are not available directly from metastore and this seems like a reasonable choice to adopt given the optimization benefit of using broadcast joins. ## How was this patch tested? I have executed queries locally to test. Author: Parth Brahmbhatt <pbrahmbhatt@netflix.com> Closes #13150 from Parth-Brahmbhatt/SPARK-15365.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala33
1 files changed, 25 insertions, 8 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 1671228fd9..9c820144ae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.hive
+import java.io.IOException
+
import scala.collection.JavaConverters._
import com.google.common.base.Objects
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore. An
- // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
- // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
- // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
+ // relatively cheap if parameters for the table are populated into the metastore.
+ // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+ // (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
- // if the size is still less than zero, we use default size
- Option(totalSize).map(_.toLong).filter(_ > 0)
- .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
- .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
+ // if the size is still less than zero, we try to get the file size from HDFS.
+ // given this is only needed for optimization, if the HDFS call fails we return the default.
+ if (totalSize != null && totalSize.toLong > 0L) {
+ totalSize.toLong
+ } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+ rawDataSize.toLong
+ } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+ fs.getContentSummary(hiveQlTable.getPath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ }
+ } else {
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ })
}
)