diff options
author | Robert Kruszewski <robertk@palantir.com> | 2016-09-02 17:14:43 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-09-02 17:14:43 +0200 |
commit | 806d8a8e980d8ba2f4261bceb393c40bafaa2f73 (patch) | |
tree | 6abbac033989b267039ec934d604d03e0a4f80a7 /core/src | |
parent | 247a4faf06c1dd47a6543c56929cd0182a03e106 (diff) | |
download | spark-806d8a8e980d8ba2f4261bceb393c40bafaa2f73.tar.gz spark-806d8a8e980d8ba2f4261bceb393c40bafaa2f73.tar.bz2 spark-806d8a8e980d8ba2f4261bceb393c40bafaa2f73.zip |
[SPARK-16984][SQL] don't try whole dataset immediately when first partition doesn't have…
## What changes were proposed in this pull request?
Try increase number of partitions to try so we don't revert to all.
## How was this patch tested?
Empirically. This is common case optimization.
Author: Robert Kruszewski <robertk@palantir.com>
Closes #14573 from robert3005/robertk/execute-take-backoff.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2ee13dc4db..10b5f8291a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { + val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) if (num == 0) { new Array[T](0) } else { @@ -1310,12 +1311,12 @@ abstract class RDD[T: ClassTag]( // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. - if (buf.size == 0) { - numPartsToTry = partsScanned * 4 + if (buf.isEmpty) { + numPartsToTry = partsScanned * scaleUpFactor } else { // the left side of max is >=1 whenever partsScanned >= 2 numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) - numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) + numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } } |