aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-03-30 16:02:48 -0700
committerYin Huai <yhuai@databricks.com>2016-03-30 16:02:48 -0700
commitdadf0138b3f6fd618677a2c26f40ab66b7a1139d (patch)
tree6aede45f7cd76644f45447ef9e962e752bc484a7 /sql/core
parentca458618d8ee659ffa9a081083cd475a440fa8ff (diff)
downloadspark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.tar.gz
spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.tar.bz2
spark-dadf0138b3f6fd618677a2c26f40ab66b7a1139d.zip
[SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a partition
## What changes were proposed in this pull request? This pr is to add a config to control the maximum number of files as even small files have a non-trivial fixed cost. The current packing can put a lot of small files together which cases straggler tasks. ## How was this patch tested? I added tests to check if many files get split into partitions in FileSourceStrategySuite. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #12068 from maropu/SPARK-14259.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala47
3 files changed, 59 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4448796b16..d6534083c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -136,7 +136,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
- logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes")
+ val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"max #files: $maxFileNumInPartition")
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
@@ -174,7 +176,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes) {
+ if (currentSize + file.length > maxSplitBytes ||
+ currentFiles.length >= maxFileNumInPartition) {
closePartition()
addFile(file)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ca6ba4c643..d06e9086a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -524,6 +524,11 @@ object SQLConf {
doc = "The maximum number of bytes to pack into a single partition when reading files.",
isPublic = true)
+ val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition",
+ defaultValue = Some(32),
+ doc = "The maximum number of files to pack into a single partition when reading files.",
+ isPublic = true)
+
val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
defaultValue = Some(true),
doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
@@ -581,6 +586,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
+ def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)
+
def useCompression: Boolean = getConf(COMPRESS_CACHED)
def useFileScan: Boolean = getConf(USE_FILE_SCAN)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 1fa15730bc..45620bc965 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -121,6 +121,53 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("Unpartitioned table, many files that get split") {
+ val table =
+ createTable(
+ files = Seq(
+ "file1" -> 2,
+ "file2" -> 2,
+ "file3" -> 1,
+ "file4" -> 1,
+ "file5" -> 1,
+ "file6" -> 1))
+
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
+ SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
+ checkScan(table.select('c1)) { partitions =>
+ // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
+ assert(partitions.size == 4, "when checking partitions")
+ assert(partitions(0).files.size == 1, "when checking partition 1")
+ assert(partitions(1).files.size == 2, "when checking partition 2")
+ assert(partitions(2).files.size == 2, "when checking partition 3")
+ assert(partitions(3).files.size == 1, "when checking partition 4")
+
+ // First partition reads (file1)
+ assert(partitions(0).files(0).start == 0)
+ assert(partitions(0).files(0).length == 2)
+
+ // Second partition reads (file2, file3)
+ assert(partitions(1).files(0).start == 0)
+ assert(partitions(1).files(0).length == 2)
+ assert(partitions(1).files(1).start == 0)
+ assert(partitions(1).files(1).length == 1)
+
+ // Third partition reads (file4, file5)
+ assert(partitions(2).files(0).start == 0)
+ assert(partitions(2).files(0).length == 1)
+ assert(partitions(2).files(1).start == 0)
+ assert(partitions(2).files(1).length == 1)
+
+ // Final partition reads (file6)
+ assert(partitions(3).files(0).start == 0)
+ assert(partitions(3).files(0).length == 1)
+ }
+
+ checkPartitionSchema(StructType(Nil))
+ checkDataSchema(StructType(Nil).add("c1", IntegerType))
+ }
+ }
+
test("partitioned table") {
val table =
createTable(