aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorParth Brahmbhatt <pbrahmbhatt@netflix.com>2016-05-24 20:58:20 -0700
committerReynold Xin <rxin@databricks.com>2016-05-24 20:58:20 -0700
commit4acababcaba567c85f3be0d5e939d99119b82d1d (patch)
tree2399ad66af26dcee58de8ba0c4c8ea18fefd07d2 /sql
parent14494da87bdf057d2d2f796b962a4d8bc4747d31 (diff)
downloadspark-4acababcaba567c85f3be0d5e939d99119b82d1d.tar.gz
spark-4acababcaba567c85f3be0d5e939d99119b82d1d.tar.bz2
spark-4acababcaba567c85f3be0d5e939d99119b82d1d.zip
[SPARK-15365][SQL] When table size statistics are not available from metastore, we should fallback to HDFS
## What changes were proposed in this pull request? Currently if a table is used in join operation we rely on Metastore returned size to calculate if we can convert the operation to Broadcast join. This optimization only kicks in for table's that have the statistics available in metastore. Hive generally rolls over to HDFS if the statistics are not available directly from metastore and this seems like a reasonable choice to adopt given the optimization benefit of using broadcast joins. ## How was this patch tested? I have executed queries locally to test. Author: Parth Brahmbhatt <pbrahmbhatt@netflix.com> Closes #13150 from Parth-Brahmbhatt/SPARK-15365.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala50
3 files changed, 83 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b91518acce..4efefdacab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,6 +113,13 @@ object SQLConf {
.longConf
.createWithDefault(10L * 1024 * 1024)
+ val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
+ SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
+ .doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
+ " This is useful in determining if a table is small enough to use auto broadcast joins.")
+ .booleanConf
+ .createWithDefault(false)
+
val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set to a larger " +
@@ -603,6 +610,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
+ def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
+
def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 1671228fd9..9c820144ae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.hive
+import java.io.IOException
+
import scala.collection.JavaConverters._
import com.google.common.base.Objects
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore. An
- // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
- // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
- // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
+ // relatively cheap if parameters for the table are populated into the metastore.
+ // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+ // (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
- // if the size is still less than zero, we use default size
- Option(totalSize).map(_.toLong).filter(_ > 0)
- .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
- .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
+ // if the size is still less than zero, we try to get the file size from HDFS.
+ // given this is only needed for optimization, if the HDFS call fails we return the default.
+ if (totalSize != null && totalSize.toLong > 0L) {
+ totalSize.toLong
+ } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+ rawDataSize.toLong
+ } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+ fs.getContentSummary(hiveQlTable.getPath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ }
+ } else {
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ })
}
)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1a7b6c0112..f8e00a35a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.io.{File, PrintWriter}
+
import scala.reflect.ClassTag
import org.apache.spark.sql.{QueryTest, Row}
@@ -25,8 +27,9 @@ import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
-class StatisticsSuite extends QueryTest with TestHiveSingleton {
+class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
import hiveContext.sql
test("parse analyze commands") {
@@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
classOf[AnalyzeTableCommand])
}
+ test("MetastoreRelations fallback to HDFS for size estimation") {
+ val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled
+ try {
+ withTempDir { tempDir =>
+
+ // EXTERNAL OpenCSVSerde table pointing to LOCATION
+
+ val file1 = new File(tempDir + "/data1")
+ val writer1 = new PrintWriter(file1)
+ writer1.write("1,2")
+ writer1.close()
+
+ val file2 = new File(tempDir + "/data2")
+ val writer2 = new PrintWriter(file2)
+ writer2.write("1,2")
+ writer2.close()
+
+ sql(
+ s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ WITH SERDEPROPERTIES (
+ \"separatorChar\" = \",\",
+ \"quoteChar\" = \"\\\"\",
+ \"escapeChar\" = \"\\\\\")
+ LOCATION '$tempDir'
+ """)
+
+ hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)
+
+ val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+ .asInstanceOf[MetastoreRelation]
+
+ val properties = relation.hiveQlTable.getParameters
+ assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0")
+ assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
+
+ val sizeInBytes = relation.statistics.sizeInBytes
+ assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+ }
+ } finally {
+ hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats)
+ sql("DROP TABLE csv_table ")
+ }
+ }
+
ignore("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
hiveContext.sessionState.catalog.lookupRelation(