diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2015-05-28 19:05:12 -0700 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2015-05-28 19:05:12 -0700 |
commit | 9b692bfdfcc91b32498865d21138cf215a378665 (patch) | |
tree | 47c6c4e57550e1c1c36785eb1088b8ec3685ceec /core/src/main/scala | |
parent | 66c49ed60dcef48a6b38ae2d2c4c479933f3aa19 (diff) | |
download | spark-9b692bfdfcc91b32498865d21138cf215a378665.tar.gz spark-9b692bfdfcc91b32498865d21138cf215a378665.tar.bz2 spark-9b692bfdfcc91b32498865d21138cf215a378665.zip |
[SPARK-7826] [CORE] Suppress extra calling getCacheLocs.
There are too many extra call method `getCacheLocs` for `DAGScheduler`, which includes Akka communication.
To improve `DAGScheduler` performance, suppress extra calling the method.
In my application with over 1200 stages, the execution time became 3.8 min from 8.5 min with my patch.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #6352 from ueshin/issues/SPARK-7826 and squashes the following commits:
3d4d036 [Takuya UESHIN] Modify a test and the documentation.
10b1b22 [Takuya UESHIN] Simplify the unit test.
d858b59 [Takuya UESHIN] Move the storageLevel check inside the if (!cacheLocs.contains(rdd.id)) block.
6f3125c [Takuya UESHIN] Fix scalastyle.
b9c835c [Takuya UESHIN] Put the condition that checks if the RDD has uncached partition or not into variable for readability.
f87f2ec [Takuya UESHIN] Get cached locations from block manager only if the storage level of the RDD is not StorageLevel.NONE.
8248386 [Takuya UESHIN] Revert "Suppress extra calling getCacheLocs."
a4d944a [Takuya UESHIN] Add an unit test.
9a80fad [Takuya UESHIN] Suppress extra calling getCacheLocs.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a083be2448..a2299e907c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -193,9 +193,15 @@ class DAGScheduler( def getCacheLocs(rdd: RDD[_]): Seq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { - val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] - val locs: Seq[Seq[TaskLocation]] = blockManagerMaster.getLocations(blockIds).map { bms => - bms.map(bm => TaskLocation(bm.host, bm.executorId)) + // Note: if the storage level is NONE, we don't need to get locations from block manager. + val locs: Seq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { + Seq.fill(rdd.partitions.size)(Nil) + } else { + val blockIds = + rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] + blockManagerMaster.getLocations(blockIds).map { bms => + bms.map(bm => TaskLocation(bm.host, bm.executorId)) + } } cacheLocs(rdd.id) = locs } @@ -382,7 +388,8 @@ class DAGScheduler( def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd - if (getCacheLocs(rdd).contains(Nil)) { + val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) + if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => |