aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-08-09 16:46:14 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-08-09 16:46:14 -0700
commit4488b3bc8a4483eeeb703d96842ea3e9695300c3 (patch)
treec6e6066f898c6dc9f09ab093c0220899657ef240 /src
parentf415b071af7f407cdf8c1796ca64394f3ec25e8e (diff)
downloadspark-4488b3bc8a4483eeeb703d96842ea3e9695300c3.tar.gz
spark-4488b3bc8a4483eeeb703d96842ea3e9695300c3.tar.bz2
spark-4488b3bc8a4483eeeb703d96842ea3e9695300c3.zip
Fixed a bug where we would incorrectly decide we've finished a parallel operation if Mesos tells us a task is finished twice
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/MesosScheduler.scala32
1 files changed, 20 insertions, 12 deletions
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 081f720bbd..84b9d9af68 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -260,23 +260,31 @@ extends ParallelOperation
def taskFinished(status: TaskStatus) {
val tid = status.getTaskId
println("Finished TID " + tid)
- // Deserialize task result
- val result = Utils.deserialize[TaskResult[T]](status.getData)
- results(tidToIndex(tid)) = result.value
- // Update accumulators
- Accumulators.add(callingThread, result.accumUpdates)
- // Mark finished and stop if we've finished all the tasks
- finished(tidToIndex(tid)) = true
- tasksFinished += 1
- if (tasksFinished == numTasks)
- setAllFinished()
+ if (!finished(tidToIndex(tid))) {
+ // Deserialize task result
+ val result = Utils.deserialize[TaskResult[T]](status.getData)
+ results(tidToIndex(tid)) = result.value
+ // Update accumulators
+ Accumulators.add(callingThread, result.accumUpdates)
+ // Mark finished and stop if we've finished all the tasks
+ finished(tidToIndex(tid)) = true
+ tasksFinished += 1
+ if (tasksFinished == numTasks)
+ setAllFinished()
+ } else {
+ printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+ }
}
def taskLost(status: TaskStatus) {
val tid = status.getTaskId
println("Lost TID " + tid)
- launched(tidToIndex(tid)) = false
- tasksLaunched -= 1
+ if (!finished(tid)) {
+ launched(tidToIndex(tid)) = false
+ tasksLaunched -= 1
+ } else {
+ printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+ }
}
def error(code: Int, message: String) {