aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-04 14:41:03 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-04 14:41:03 -0700
commit400b2f863ffaa01a34a8dae1541c61526fef908b (patch)
treeeb0773854538319d9534c2ebdb36a9eb65f513ae
parentcc70f174169f45c85d459126a68bbe43c0bec328 (diff)
downloadspark-400b2f863ffaa01a34a8dae1541c61526fef908b.tar.gz
spark-400b2f863ffaa01a34a8dae1541c61526fef908b.tar.bz2
spark-400b2f863ffaa01a34a8dae1541c61526fef908b.zip
[SPARK-14259] [SQL] Merging small files together based on the cost of opening
## What changes were proposed in this pull request? This PR basically re-do the things in #12068 but with a different model, which should work better in case of small files with different sizes. ## How was this patch tested? Updated existing tests. Ran a query on thousands of partitioned small files locally, with all default settings (the cost to open a file should be over estimated), the durations of tasks become smaller and smaller, which is good (the last few tasks will be shortest). Author: Davies Liu <davies@databricks.com> Closes #12095 from davies/file_cost.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala14
3 files changed, 21 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index a143ac6aec..618d5a522b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -131,9 +131,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case _ =>
val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
- val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition
+ val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
- s"max #files: $maxFileNumInPartition")
+ s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
@@ -151,7 +151,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
/** Add the given file to the current partition. */
def addFile(file: PartitionedFile): Unit = {
- currentSize += file.length
+ currentSize += file.length + openCostInBytes
currentFiles.append(file)
}
@@ -171,13 +171,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// Assign files to partitions using "First Fit Decreasing" (FFD)
// TODO: consider adding a slop factor here?
splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes ||
- currentFiles.length >= maxFileNumInPartition) {
+ if (currentSize + file.length > maxSplitBytes) {
closePartition()
- addFile(file)
- } else {
- addFile(file)
}
+ addFile(file)
}
closePartition()
partitions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6cc72fba48..a7c0be63fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -510,10 +510,13 @@ object SQLConf {
doc = "The maximum number of bytes to pack into a single partition when reading files.",
isPublic = true)
- val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition",
- defaultValue = Some(32),
- doc = "The maximum number of files to pack into a single partition when reading files.",
- isPublic = true)
+ val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
+ defaultValue = Some(4 * 1024 * 1024),
+ doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+ " the same time. This is used when putting multiple files into a partition. It's better to" +
+ " over estimated, then the partitions with small files will be faster than partitions with" +
+ " bigger files (which is scheduled first).",
+ isPublic = false)
val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
defaultValue = Some(true),
@@ -572,7 +575,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
- def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)
+ def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
def useCompression: Boolean = getConf(COMPRESS_CACHED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 717a3a80b7..4446a6881c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -76,7 +76,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
"file2" -> 5,
"file3" -> 5))
- withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11",
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// 5 byte files should be laid out [(5, 5), (5)]
assert(partitions.size == 2, "when checking partitions")
@@ -98,11 +99,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
createTable(
files = Seq(
"file1" -> 15,
- "file2" -> 4))
+ "file2" -> 3))
- withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
- // Files should be laid out [(0-5), (5-10, 4)]
+ // Files should be laid out [(0-10), (10-15, 4)]
assert(partitions.size == 2, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
@@ -132,8 +134,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
"file5" -> 1,
"file6" -> 1))
- withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
- SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
assert(partitions.size == 4, "when checking partitions")