diff options
author | Davies Liu <davies@databricks.com> | 2016-04-22 17:09:16 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-22 17:09:16 -0700 |
commit | c25b97fccee557c9247ad5bf006a83a55c5e0e32 (patch) | |
tree | e9af893aadcef6635180fe98eca4c19c0f93d81f /sql/core/src/main/scala/org | |
parent | fde1340c768e18e9628e5f0eeb2f283c74c294fa (diff) | |
download | spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.tar.gz spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.tar.bz2 spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.zip |
[SPARK-14582][SQL] increase parallelism for small tables
## What changes were proposed in this pull request?
This PR try to increase the parallelism for small table (a few of big files) to reduce the query time, by decrease the maxSplitBytes, the goal is to have at least one task per CPU in the cluster, if the total size of all files is bigger than openCostInBytes * 2 * nCPU.
For example, a small/medium table could be used as dimension table in huge query, this will be useful to reduce the time waiting for broadcast.
## How was this patch tested?
Existing tests.
Author: Davies Liu <davies@databricks.com>
Closes #12344 from davies/more_partition.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index ee48a7b81d..c1a97de72f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { } case _ => - val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes + val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes + val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + val maxSplitBytes = Math.min(defaultMaxSplitBytes, + Math.max(openCostInBytes, bytesPerCore)) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") |