diff options
author | Takeshi YAMAMURO <linguin.m.s@gmail.com> | 2016-03-30 16:02:48 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-30 16:02:48 -0700 |
commit | dadf0138b3f6fd618677a2c26f40ab66b7a1139d (patch) | |
tree | 6aede45f7cd76644f45447ef9e962e752bc484a7 /sql/core/src/main | |
parent | ca458618d8ee659ffa9a081083cd475a440fa8ff (diff) | |
download | spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.tar.gz spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.tar.bz2 spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.zip |
[SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a partition
## What changes were proposed in this pull request?
This pr is to add a config to control the maximum number of files as even small files have a non-trivial fixed cost. The current packing can put a lot of small files together which cases straggler tasks.
## How was this patch tested?
I added tests to check if many files get split into partitions in FileSourceStrategySuite.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes #12068 from maropu/SPARK-14259.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala | 7 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 |
2 files changed, 12 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4448796b16..d6534083c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -136,7 +136,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case _ => val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes") + val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"max #files: $maxFileNumInPartition") val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => @@ -174,7 +176,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { // Assign files to partitions using "First Fit Decreasing" (FFD) // TODO: consider adding a slop factor here? splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { + if (currentSize + file.length > maxSplitBytes || + currentFiles.length >= maxFileNumInPartition) { closePartition() addFile(file) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ca6ba4c643..d06e9086a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -524,6 +524,11 @@ object SQLConf { doc = "The maximum number of bytes to pack into a single partition when reading files.", isPublic = true) + val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition", + defaultValue = Some(32), + doc = "The maximum number of files to pack into a single partition when reading files.", + isPublic = true) + val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse", defaultValue = Some(true), doc = "When true, the planner will try to find out duplicated exchanges and re-use them.", @@ -581,6 +586,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) + def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def useFileScan: Boolean = getConf(USE_FILE_SCAN) |