aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorRobert Kruszewski <robertk@palantir.com>2016-09-02 17:14:43 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-02 17:14:43 +0200
commit806d8a8e980d8ba2f4261bceb393c40bafaa2f73 (patch)
tree6abbac033989b267039ec934d604d03e0a4f80a7 /core/src
parent247a4faf06c1dd47a6543c56929cd0182a03e106 (diff)
downloadspark-806d8a8e980d8ba2f4261bceb393c40bafaa2f73.tar.gz
spark-806d8a8e980d8ba2f4261bceb393c40bafaa2f73.tar.bz2
spark-806d8a8e980d8ba2f4261bceb393c40bafaa2f73.zip
[SPARK-16984][SQL] don't try whole dataset immediately when first partition doesn't have…
## What changes were proposed in this pull request? Try increase number of partitions to try so we don't revert to all. ## How was this patch tested? Empirically. This is common case optimization. Author: Robert Kruszewski <robertk@palantir.com> Closes #14573 from robert3005/robertk/execute-take-backoff.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 2ee13dc4db..10b5f8291a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag](
* an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = withScope {
+ val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
if (num == 0) {
new Array[T](0)
} else {
@@ -1310,12 +1311,12 @@ abstract class RDD[T: ClassTag](
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
- if (buf.size == 0) {
- numPartsToTry = partsScanned * 4
+ if (buf.isEmpty) {
+ numPartsToTry = partsScanned * scaleUpFactor
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
- numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
+ numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}