From 3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 6 Sep 2014 19:06:30 -0700 Subject: [SPARK-3353] parent stage should have lower stage id. Previously parent stages had higher stage id, but parent stages are executed first. This pull request changes the behavior so parent stages would have lower stage id. For example, command: ```scala sc.parallelize(1 to 10).map(x=>(x,x)).reduceByKey(_+_).count ``` breaks down into 2 stages. The old web UI: ![screen shot 2014-09-04 at 12 42 44 am](https://cloud.githubusercontent.com/assets/323388/4146177/60fb4f42-3407-11e4-819f-853eb0e22b25.png) Web UI with this patch: ![screen shot 2014-09-04 at 12 44 55 am](https://cloud.githubusercontent.com/assets/323388/4146178/62e08e62-3407-11e4-867b-a36b10534464.png) Author: Reynold Xin Closes #2273 from rxin/lower-stage-id and squashes the following commits: abbb4c6 [Reynold Xin] Fixed SparkListenerSuite. 0e02379 [Reynold Xin] Updated DAGSchedulerSuite. 54ccea3 [Reynold Xin] [SPARK-3353] parent stage should have lower stage id. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 25 ++++++++++++++-------- .../spark/scheduler/SparkListenerSuite.scala | 2 +- 3 files changed, 19 insertions(+), 12 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2ccc27324a..6fcf9e3154 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -241,9 +241,9 @@ class DAGScheduler( callSite: CallSite) : Stage = { + val parentStages = getParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() - val stage = - new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) + val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0bb91febde..aa73469b6a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} @@ -97,10 +98,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 val sparkListener = new SparkListener() { - val successfulStages = new HashSet[Int]() - val failedStages = new ArrayBuffer[Int]() + val successfulStages = new HashSet[Int] + val failedStages = new ArrayBuffer[Int] + val stageByOrderOfExecution = new ArrayBuffer[Int] override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo + stageByOrderOfExecution += stageInfo.stageId if (stageInfo.failureReason.isEmpty) { successfulStages += stageInfo.stageId } else { @@ -231,6 +234,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F runEvent(JobCancelled(jobId)) } + test("[SPARK-3353] parent stage should have lower stage id") { + sparkListener.stageByOrderOfExecution.clear() + sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() + assert(sparkListener.stageByOrderOfExecution.length === 2) + assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) + } + test("zero split job") { var numResults = 0 val fakeListener = new JobListener() { @@ -457,7 +467,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F null, null)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.contains(1)) // The second ResultTask fails, with a fetch failure for the output from the second mapper. runEvent(CompletionEvent( @@ -515,8 +525,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F // Listener bus should get told about the map stage failing, but not the reduce stage // (since the reduce stage hasn't been started yet). assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(sparkListener.failedStages.contains(1)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages.toSet === Set(0)) assertDataStructuresEmpty } @@ -563,14 +572,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val stageFailureMessage = "Exception failure in map stage" failed(taskSets(0), stageFailureMessage) - assert(cancelledStages.contains(1)) + assert(cancelledStages.toSet === Set(0, 2)) // Make sure the listeners got told about both failed stages. assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sparkListener.successfulStages.isEmpty) - assert(sparkListener.failedStages.contains(1)) - assert(sparkListener.failedStages.contains(3)) - assert(sparkListener.failedStages.size === 2) + assert(sparkListener.failedStages.toSet === Set(0, 2)) assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 3b0b8e2f68..ab35e8edc4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -180,7 +180,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers rdd3.count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be {2} // Shuffle map stage + result stage - val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get + val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get stageInfo3.rddInfos.size should be {1} // ShuffledRDD stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true} stageInfo3.rddInfos.exists(_.name == "Trois") should be {true} -- cgit v1.2.3