diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-08-09 16:46:14 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-08-09 16:46:14 -0700 |
commit | 4488b3bc8a4483eeeb703d96842ea3e9695300c3 (patch) | |
tree | c6e6066f898c6dc9f09ab093c0220899657ef240 /src | |
parent | f415b071af7f407cdf8c1796ca64394f3ec25e8e (diff) | |
download | spark-4488b3bc8a4483eeeb703d96842ea3e9695300c3.tar.gz spark-4488b3bc8a4483eeeb703d96842ea3e9695300c3.tar.bz2 spark-4488b3bc8a4483eeeb703d96842ea3e9695300c3.zip |
Fixed a bug where we would incorrectly decide we've finished a parallel operation if Mesos tells us a task is finished twice
Diffstat (limited to 'src')
-rw-r--r-- | src/scala/spark/MesosScheduler.scala | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index 081f720bbd..84b9d9af68 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -260,23 +260,31 @@ extends ParallelOperation def taskFinished(status: TaskStatus) { val tid = status.getTaskId println("Finished TID " + tid) - // Deserialize task result - val result = Utils.deserialize[TaskResult[T]](status.getData) - results(tidToIndex(tid)) = result.value - // Update accumulators - Accumulators.add(callingThread, result.accumUpdates) - // Mark finished and stop if we've finished all the tasks - finished(tidToIndex(tid)) = true - tasksFinished += 1 - if (tasksFinished == numTasks) - setAllFinished() + if (!finished(tidToIndex(tid))) { + // Deserialize task result + val result = Utils.deserialize[TaskResult[T]](status.getData) + results(tidToIndex(tid)) = result.value + // Update accumulators + Accumulators.add(callingThread, result.accumUpdates) + // Mark finished and stop if we've finished all the tasks + finished(tidToIndex(tid)) = true + tasksFinished += 1 + if (tasksFinished == numTasks) + setAllFinished() + } else { + printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid)) + } } def taskLost(status: TaskStatus) { val tid = status.getTaskId println("Lost TID " + tid) - launched(tidToIndex(tid)) = false - tasksLaunched -= 1 + if (!finished(tid)) { + launched(tidToIndex(tid)) = false + tasksLaunched -= 1 + } else { + printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid)) + } } def error(code: Int, message: String) { |