aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-22 17:09:16 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 17:09:16 -0700
commitc25b97fccee557c9247ad5bf006a83a55c5e0e32 (patch)
treee9af893aadcef6635180fe98eca4c19c0f93d81f /sql/core/src/main/scala/org
parentfde1340c768e18e9628e5f0eeb2f283c74c294fa (diff)
downloadspark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.tar.gz
spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.tar.bz2
spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.zip
[SPARK-14582][SQL] increase parallelism for small tables
## What changes were proposed in this pull request? This PR try to increase the parallelism for small table (a few of big files) to reduce the query time, by decrease the maxSplitBytes, the goal is to have at least one task per CPU in the cluster, if the total size of all files is bigger than openCostInBytes * 2 * nCPU. For example, a small/medium table could be used as dimension table in huge query, this will be useful to reduce the time waiting for broadcast. ## How was this patch tested? Existing tests. Author: Davies Liu <davies@databricks.com> Closes #12344 from davies/more_partition.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ee48a7b81d..c1a97de72f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
case _ =>
- val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
+ val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
+ val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
+ val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes,
+ Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")