diff options
author | Takeshi YAMAMURO <linguin.m.s@gmail.com> | 2016-03-30 16:02:48 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-30 16:02:48 -0700 |
commit | dadf0138b3f6fd618677a2c26f40ab66b7a1139d (patch) | |
tree | 6aede45f7cd76644f45447ef9e962e752bc484a7 /sql | |
parent | ca458618d8ee659ffa9a081083cd475a440fa8ff (diff) | |
download | spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.tar.gz spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.tar.bz2 spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.zip |
[SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a partition
## What changes were proposed in this pull request?
This pr is to add a config to control the maximum number of files as even small files have a non-trivial fixed cost. The current packing can put a lot of small files together which cases straggler tasks.
## How was this patch tested?
I added tests to check if many files get split into partitions in FileSourceStrategySuite.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes #12068 from maropu/SPARK-14259.
Diffstat (limited to 'sql')
3 files changed, 59 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4448796b16..d6534083c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -136,7 +136,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case _ => val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes") + val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"max #files: $maxFileNumInPartition") val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => @@ -174,7 +176,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { // Assign files to partitions using "First Fit Decreasing" (FFD) // TODO: consider adding a slop factor here? splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { + if (currentSize + file.length > maxSplitBytes || + currentFiles.length >= maxFileNumInPartition) { closePartition() addFile(file) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ca6ba4c643..d06e9086a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -524,6 +524,11 @@ object SQLConf { doc = "The maximum number of bytes to pack into a single partition when reading files.", isPublic = true) + val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition", + defaultValue = Some(32), + doc = "The maximum number of files to pack into a single partition when reading files.", + isPublic = true) + val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse", defaultValue = Some(true), doc = "When true, the planner will try to find out duplicated exchanges and re-use them.", @@ -581,6 +586,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) + def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def useFileScan: Boolean = getConf(USE_FILE_SCAN) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 1fa15730bc..45620bc965 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -121,6 +121,53 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("Unpartitioned table, many files that get split") { + val table = + createTable( + files = Seq( + "file1" -> 2, + "file2" -> 2, + "file3" -> 1, + "file4" -> 1, + "file5" -> 1, + "file6" -> 1)) + + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3", + SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") { + checkScan(table.select('c1)) { partitions => + // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] + assert(partitions.size == 4, "when checking partitions") + assert(partitions(0).files.size == 1, "when checking partition 1") + assert(partitions(1).files.size == 2, "when checking partition 2") + assert(partitions(2).files.size == 2, "when checking partition 3") + assert(partitions(3).files.size == 1, "when checking partition 4") + + // First partition reads (file1) + assert(partitions(0).files(0).start == 0) + assert(partitions(0).files(0).length == 2) + + // Second partition reads (file2, file3) + assert(partitions(1).files(0).start == 0) + assert(partitions(1).files(0).length == 2) + assert(partitions(1).files(1).start == 0) + assert(partitions(1).files(1).length == 1) + + // Third partition reads (file4, file5) + assert(partitions(2).files(0).start == 0) + assert(partitions(2).files(0).length == 1) + assert(partitions(2).files(1).start == 0) + assert(partitions(2).files(1).length == 1) + + // Final partition reads (file6) + assert(partitions(3).files(0).start == 0) + assert(partitions(3).files(0).length == 1) + } + + checkPartitionSchema(StructType(Nil)) + checkDataSchema(StructType(Nil).add("c1", IntegerType)) + } + } + test("partitioned table") { val table = createTable( |