aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Ash <andrew@andrewash.com>2014-09-05 18:52:05 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-05 18:52:05 -0700
commitba5bcaddecd54811d45c5fc79a013b3857d4c633 (patch)
treeb5ada50e5a507971c37268f8bc216c588daa8c48
parent7ff8c45d714e0f2315910838b739c0c034672015 (diff)
downloadspark-ba5bcaddecd54811d45c5fc79a013b3857d4c633.tar.gz
spark-ba5bcaddecd54811d45c5fc79a013b3857d4c633.tar.bz2
spark-ba5bcaddecd54811d45c5fc79a013b3857d4c633.zip
SPARK-3211 .take() is OOM-prone with empty partitions
Instead of jumping straight from 1 partition to all partitions, do exponential growth and double the number of partitions to attempt each time instead. Fix proposed by Paul Nepywoda Author: Andrew Ash <andrew@andrewash.com> Closes #2117 from ash211/SPARK-3211 and squashes the following commits: 8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing 09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well 3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
-rw-r--r--python/pyspark/rdd.py8
2 files changed, 7 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index af9e31ba7b..1cf55e86f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
- // If we didn't find any rows after the first iteration, just try all partitions next.
- // Otherwise, interpolate the number of partitions we need to try, but overestimate it
- // by 50%.
+ // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
+ // interpolate the number of partitions we need to try, but overestimate it by 50%.
if (buf.size == 0) {
- numPartsToTry = totalParts - 1
+ numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index dff6fc26fc..04f13523b4 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1089,11 +1089,11 @@ class RDD(object):
# we actually cap it at totalParts in runJob.
numPartsToTry = 1
if partsScanned > 0:
- # If we didn't find any rows after the first iteration, just
- # try all partitions next. Otherwise, interpolate the number
- # of partitions we need to try, but overestimate it by 50%.
+ # If we didn't find any rows after the previous iteration,
+ # quadruple and retry. Otherwise, interpolate the number of
+ # partitions we need to try, but overestimate it by 50%.
if len(items) == 0:
- numPartsToTry = totalParts - 1
+ numPartsToTry = partsScanned * 4
else:
numPartsToTry = int(1.5 * num * partsScanned / len(items))