aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authoryingjieMiao <yingjie@42go.com>2014-10-13 13:11:55 -0700
committerReynold Xin <rxin@apache.org>2014-10-13 13:11:55 -0700
commit49bbdcb660edff7522430b329a300765164ccc44 (patch)
treef67be5476cf24460d0215e58040ca119fad5134d /python
parent39ccabacf11abdd9afc8f9895084c6707ff35c85 (diff)
downloadspark-49bbdcb660edff7522430b329a300765164ccc44.tar.gz
spark-49bbdcb660edff7522430b329a300765164ccc44.tar.bz2
spark-49bbdcb660edff7522430b329a300765164ccc44.zip
[Spark] RDD take() method: overestimate too much
In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%." `(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned` Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`) This could be a performance problem. (unless this is the intended behavior) Author: yingjieMiao <yingjie@42go.com> Closes #2648 from yingjieMiao/rdd_take and squashes the following commits: d758218 [yingjieMiao] scala style fix a8e74bb [yingjieMiao] python style fix 4b6e777 [yingjieMiao] infix operator style fix 4391d3b [yingjieMiao] typo fix. 692f4e6 [yingjieMiao] cap numPartsToTry c4483dc [yingjieMiao] style fix 1d2c410 [yingjieMiao] also change in rdd.py and AsyncRDD d31ff7e [yingjieMiao] handle the edge case after 1 iteration a2aa36b [yingjieMiao] RDD take method: overestimate too much
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index e13bab946c..15be4bfec9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1070,10 +1070,13 @@ class RDD(object):
# If we didn't find any rows after the previous iteration,
# quadruple and retry. Otherwise, interpolate the number of
# partitions we need to try, but overestimate it by 50%.
+ # We also cap the estimation in the end.
if len(items) == 0:
numPartsToTry = partsScanned * 4
else:
- numPartsToTry = int(1.5 * num * partsScanned / len(items))
+ # the first paramter of max is >=1 whenever partsScanned >= 2
+ numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned
+ numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4)
left = num - len(items)