diff options
author | Andrew Ash <andrew@andrewash.com> | 2014-09-05 18:52:05 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-09-05 18:52:05 -0700 |
commit | ba5bcaddecd54811d45c5fc79a013b3857d4c633 (patch) | |
tree | b5ada50e5a507971c37268f8bc216c588daa8c48 /python | |
parent | 7ff8c45d714e0f2315910838b739c0c034672015 (diff) | |
download | spark-ba5bcaddecd54811d45c5fc79a013b3857d4c633.tar.gz spark-ba5bcaddecd54811d45c5fc79a013b3857d4c633.tar.bz2 spark-ba5bcaddecd54811d45c5fc79a013b3857d4c633.zip |
SPARK-3211 .take() is OOM-prone with empty partitions
Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.
Fix proposed by Paul Nepywoda
Author: Andrew Ash <andrew@andrewash.com>
Closes #2117 from ash211/SPARK-3211 and squashes the following commits:
8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup
e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing
09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well
3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/rdd.py | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index dff6fc26fc..04f13523b4 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1089,11 +1089,11 @@ class RDD(object): # we actually cap it at totalParts in runJob. numPartsToTry = 1 if partsScanned > 0: - # If we didn't find any rows after the first iteration, just - # try all partitions next. Otherwise, interpolate the number - # of partitions we need to try, but overestimate it by 50%. + # If we didn't find any rows after the previous iteration, + # quadruple and retry. Otherwise, interpolate the number of + # partitions we need to try, but overestimate it by 50%. if len(items) == 0: - numPartsToTry = totalParts - 1 + numPartsToTry = partsScanned * 4 else: numPartsToTry = int(1.5 * num * partsScanned / len(items)) |