aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-11-26 14:06:59 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-12-03 09:57:32 -0800
commitf55d0b935d7c148f49b15932938e91150b64466f (patch)
tree1ef00d1ef1e6127bf2522fbff60a61ef5e8ed395 /core
parentc9fcd909d0f86b08935a132409888b30e989bca4 (diff)
downloadspark-f55d0b935d7c148f49b15932938e91150b64466f.tar.gz
spark-f55d0b935d7c148f49b15932938e91150b64466f.tar.bz2
spark-f55d0b935d7c148f49b15932938e91150b64466f.zip
Synchronous, inline cleanup after runLocally
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
3 files changed, 6 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b371a2412f..b849867519 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -597,14 +597,6 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
- case LocalJobCompleted(job, result) =>
- val stage = job.finalStage
- stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job,
- stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through
- stageToInfos -= stage // completion events or stage abort
- jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job, result))
-
case TaskSetFailed(taskSet, reason) =>
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
@@ -691,7 +683,12 @@ class DAGScheduler(
jobResult = JobFailed(e, Some(job.finalStage))
job.listener.jobFailed(e)
} finally {
- eventProcessActor ! LocalJobCompleted(job, jobResult)
+ val s = job.finalStage
+ stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
+ stageIdToStage -= s.id // but that won't get cleaned up via the normal paths through
+ stageToInfos -= s // completion events or stage abort
+ jobIdToStageIds -= job.jobId
+ listenerBus.post(SparkListenerJobEnd(job, jobResult))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index aa496b7ac6..add1187613 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent(
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent
-private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent
-
private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8ce8c68af3..706d84a58b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
}
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
- assert(scheduler.stageToInfos.size === 1)
- runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}