aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-22 17:09:16 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 17:09:16 -0700
commitc25b97fccee557c9247ad5bf006a83a55c5e0e32 (patch)
treee9af893aadcef6635180fe98eca4c19c0f93d81f
parentfde1340c768e18e9628e5f0eeb2f283c74c294fa (diff)
downloadspark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.tar.gz
spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.tar.bz2
spark-c25b97fccee557c9247ad5bf006a83a55c5e0e32.zip
[SPARK-14582][SQL] increase parallelism for small tables
## What changes were proposed in this pull request? This PR try to increase the parallelism for small table (a few of big files) to reduce the query time, by decrease the maxSplitBytes, the goal is to have at least one task per CPU in the cluster, if the total size of all files is bigger than openCostInBytes * 2 * nCPU. For example, a small/medium table could be used as dimension table in huge query, this will be useful to reduce the time waiting for broadcast. ## How was this patch tested? Existing tests. Author: Davies Liu <davies@databricks.com> Closes #12344 from davies/more_partition.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala3
2 files changed, 9 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ee48a7b81d..c1a97de72f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
case _ =>
- val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
+ val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
+ val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
+ val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes,
+ Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4699c48c72..50cd03a40c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
@@ -38,6 +39,8 @@ import org.apache.spark.util.Utils
class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
import testImplicits._
+ protected override val sparkConf = new SparkConf().set("spark.default.parallelism", "1")
+
test("unpartitioned table, single partition") {
val table =
createTable(