aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala50
3 files changed, 83 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b91518acce..4efefdacab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,6 +113,13 @@ object SQLConf {
.longConf
.createWithDefault(10L * 1024 * 1024)
+ val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
+ SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
+ .doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
+ " This is useful in determining if a table is small enough to use auto broadcast joins.")
+ .booleanConf
+ .createWithDefault(false)
+
val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set to a larger " +
@@ -603,6 +610,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
+ def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
+
def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 1671228fd9..9c820144ae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.hive
+import java.io.IOException
+
import scala.collection.JavaConverters._
import com.google.common.base.Objects
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
- // relatively cheap if parameters for the table are populated into the metastore. An
- // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
- // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
- // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
+ // relatively cheap if parameters for the table are populated into the metastore.
+ // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+ // (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
- // if the size is still less than zero, we use default size
- Option(totalSize).map(_.toLong).filter(_ > 0)
- .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
- .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
+ // if the size is still less than zero, we try to get the file size from HDFS.
+ // given this is only needed for optimization, if the HDFS call fails we return the default.
+ if (totalSize != null && totalSize.toLong > 0L) {
+ totalSize.toLong
+ } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+ rawDataSize.toLong
+ } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+ try {
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+ fs.getContentSummary(hiveQlTable.getPath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ }
+ } else {
+ sparkSession.sessionState.conf.defaultSizeInBytes
+ })
}
)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1a7b6c0112..f8e00a35a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.io.{File, PrintWriter}
+
import scala.reflect.ClassTag
import org.apache.spark.sql.{QueryTest, Row}
@@ -25,8 +27,9 @@ import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
-class StatisticsSuite extends QueryTest with TestHiveSingleton {
+class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
import hiveContext.sql
test("parse analyze commands") {
@@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
classOf[AnalyzeTableCommand])
}
+ test("MetastoreRelations fallback to HDFS for size estimation") {
+ val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled
+ try {
+ withTempDir { tempDir =>
+
+ // EXTERNAL OpenCSVSerde table pointing to LOCATION
+
+ val file1 = new File(tempDir + "/data1")
+ val writer1 = new PrintWriter(file1)
+ writer1.write("1,2")
+ writer1.close()
+
+ val file2 = new File(tempDir + "/data2")
+ val writer2 = new PrintWriter(file2)
+ writer2.write("1,2")
+ writer2.close()
+
+ sql(
+ s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+ ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ WITH SERDEPROPERTIES (
+ \"separatorChar\" = \",\",
+ \"quoteChar\" = \"\\\"\",
+ \"escapeChar\" = \"\\\\\")
+ LOCATION '$tempDir'
+ """)
+
+ hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)
+
+ val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+ .asInstanceOf[MetastoreRelation]
+
+ val properties = relation.hiveQlTable.getParameters
+ assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0")
+ assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
+
+ val sizeInBytes = relation.statistics.sizeInBytes
+ assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+ }
+ } finally {
+ hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats)
+ sql("DROP TABLE csv_table ")
+ }
+ }
+
ignore("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
hiveContext.sessionState.catalog.lookupRelation(