diff options
author | zsxwing <zsxwing@gmail.com> | 2015-01-04 21:06:04 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-01-04 21:06:04 -0800 |
commit | 6c726a3fbd9cd3aa5f3a1992b2132b25eabb76a0 (patch) | |
tree | eae3305e683af94082931e357a572170336ceea3 /core | |
parent | 72396522bcf5303f761956658510672e4feb2845 (diff) | |
download | spark-6c726a3fbd9cd3aa5f3a1992b2132b25eabb76a0.tar.gz spark-6c726a3fbd9cd3aa5f3a1992b2132b25eabb76a0.tar.bz2 spark-6c726a3fbd9cd3aa5f3a1992b2132b25eabb76a0.zip |
[SPARK-5069][Core] Fix the race condition of TaskSchedulerImpl.dagScheduler
It's not necessary to set `TaskSchedulerImpl.dagScheduler` in preStart. It's safe to set it after `initializeEventProcessActor()`.
Author: zsxwing <zsxwing@gmail.com>
Closes #3887 from zsxwing/SPARK-5069 and squashes the following commits:
d95894f [zsxwing] Fix the race condition of TaskSchedulerImpl.dagScheduler
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 1 |
2 files changed, 1 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb8ccfbdbd..259621d263 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -138,6 +138,7 @@ class DAGScheduler( } initializeEventProcessActor() + taskScheduler.setDAGScheduler(this) // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -1375,12 +1376,6 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { - override def preStart() { - // set DAGScheduler for taskScheduler to ensure eventProcessActor is always - // valid when the messages arrive - dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) - } - /** * The main event loop of the DAG scheduler. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 40aaf9dd1f..00812e6018 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -305,7 +305,6 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } - taskScheduler.setDAGScheduler(dagScheduler) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) |