From 400b2f863ffaa01a34a8dae1541c61526fef908b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 4 Apr 2016 14:41:03 -0700 Subject: [SPARK-14259] [SQL] Merging small files together based on the cost of opening ## What changes were proposed in this pull request? This PR basically re-do the things in #12068 but with a different model, which should work better in case of small files with different sizes. ## How was this patch tested? Updated existing tests. Ran a query on thousands of partitioned small files locally, with all default settings (the cost to open a file should be over estimated), the durations of tasks become smaller and smaller, which is good (the last few tasks will be shortest). Author: Davies Liu Closes #12095 from davies/file_cost. --- .../sql/execution/datasources/FileSourceStrategy.scala | 13 +++++-------- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++++----- .../execution/datasources/FileSourceStrategySuite.scala | 14 ++++++++------ 3 files changed, 21 insertions(+), 19 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index a143ac6aec..618d5a522b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -131,9 +131,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case _ => val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes - val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition + val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"max #files: $maxFileNumInPartition") + s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => @@ -151,7 +151,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { /** Add the given file to the current partition. */ def addFile(file: PartitionedFile): Unit = { - currentSize += file.length + currentSize += file.length + openCostInBytes currentFiles.append(file) } @@ -171,13 +171,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { // Assign files to partitions using "First Fit Decreasing" (FFD) // TODO: consider adding a slop factor here? splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes || - currentFiles.length >= maxFileNumInPartition) { + if (currentSize + file.length > maxSplitBytes) { closePartition() - addFile(file) - } else { - addFile(file) } + addFile(file) } closePartition() partitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6cc72fba48..a7c0be63fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -510,10 +510,13 @@ object SQLConf { doc = "The maximum number of bytes to pack into a single partition when reading files.", isPublic = true) - val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition", - defaultValue = Some(32), - doc = "The maximum number of files to pack into a single partition when reading files.", - isPublic = true) + val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes", + defaultValue = Some(4 * 1024 * 1024), + doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" + + " the same time. This is used when putting multiple files into a partition. It's better to" + + " over estimated, then the partitions with small files will be faster than partitions with" + + " bigger files (which is scheduled first).", + isPublic = false) val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse", defaultValue = Some(true), @@ -572,7 +575,7 @@ class SQLConf extends Serializable with CatalystConf with Logging { def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) - def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION) + def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) def useCompression: Boolean = getConf(COMPRESS_CACHED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 717a3a80b7..4446a6881c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -76,7 +76,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi "file2" -> 5, "file3" -> 5)) - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") { + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => // 5 byte files should be laid out [(5, 5), (5)] assert(partitions.size == 2, "when checking partitions") @@ -98,11 +99,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi createTable( files = Seq( "file1" -> 15, - "file2" -> 4)) + "file2" -> 3)) - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") { + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => - // Files should be laid out [(0-5), (5-10, 4)] + // Files should be laid out [(0-10), (10-15, 4)] assert(partitions.size == 2, "when checking partitions") assert(partitions(0).files.size == 1, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") @@ -132,8 +134,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi "file5" -> 1, "file6" -> 1)) - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3", - SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") { + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] assert(partitions.size == 4, "when checking partitions") -- cgit v1.2.3