aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>2016-06-01 13:21:40 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-01 13:21:40 -0700
commit5b08ee6396aeb4e0aa6139892a27186813c90931 (patch)
tree1f576c0e56c9986b43406000e9d263d103c1008c
parent2402b91461146289a78d6cbc53ed8338543e6de7 (diff)
downloadspark-5b08ee6396aeb4e0aa6139892a27186813c90931.tar.gz
spark-5b08ee6396aeb4e0aa6139892a27186813c90931.tar.bz2
spark-5b08ee6396aeb4e0aa6139892a27186813c90931.zip
[SPARK-15671] performance regression CoalesceRDD.pickBin with large #…
I was running a 15TB join job with 202000 partitions. It looks like the changes I made to CoalesceRDD in pickBin() are really slow with that large of partitions. The array filter with that many elements just takes to long. It took about an hour for it to pickBins for all the partitions. original change: https://github.com/apache/spark/commit/83ee92f60345f016a390d61a82f1d924f64ddf90 Just reverting the pickBin code back to get currpreflocs fixes the issue After reverting the pickBin code the coalesce takes about 10 seconds so for now it makes sense to revert those changes and we can look at further optimizations later. Tested this via RDDSuite unit test and manually testing the very large job. Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com> Closes #13443 from tgravescs/SPARK-15671.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala10
1 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c19ed1529b..2ec9846e33 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -169,6 +169,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
var noLocality = true // if true if no preferredLocations exists for parent RDD
+ // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+ def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
+ prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+ }
+
class PartitionLocations(prev: RDD[_]) {
// contains all the partitions from the previous RDD that don't have preferred locations
@@ -184,7 +189,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
// first get the locations for each partition, only do this once since it can be expensive
prev.partitions.foreach(p => {
- val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host)
+ val locs = currPrefLocs(p, prev)
if (locs.size > 0) {
tmpPartsWithLocs.put(p, locs)
} else {
@@ -287,9 +292,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
balanceSlack: Double,
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
- val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq
// least loaded pref locs
- val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
+ val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)