diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-11-26 16:52:04 -0800 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-11-26 16:52:13 -0800 |
commit | 66cc2431462a5354bb50c196a59da0ffc258c466 (patch) | |
tree | 09d83f0ce94545d07149afc5375d5a0a6c135f58 /core | |
parent | 69550f761c53da80343ae982db38780cd2ad956f (diff) | |
download | spark-66cc2431462a5354bb50c196a59da0ffc258c466.tar.gz spark-66cc2431462a5354bb50c196a59da0ffc258c466.tar.bz2 spark-66cc2431462a5354bb50c196a59da0ffc258c466.zip |
[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator
https://issues.apache.org/jira/browse/SPARK-3628
In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive
In this patch, I changed the way for the DAGScheduler to update the accumulator,
DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt...
Author: CodingCat <zhunansjtu@gmail.com>
Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits:
701a1e8 [CodingCat] roll back change on Accumulator.scala
1433e6f [CodingCat] make MIMA happy
b233737 [CodingCat] address Matei's comments
02261b8 [CodingCat] rollback some changes
6b0aff9 [CodingCat] update document
2b2e8cf [CodingCat] updateAccumulator
83b75f8 [CodingCat] style fix
84570d2 [CodingCat] re-enable the bad accumulator guard
1e9e14d [CodingCat] add NPE guard
21b6840 [CodingCat] simplify the patch
88d1f03 [CodingCat] fix rebase error
f74266b [CodingCat] add test case for resubmitted result stage
5cf586f [CodingCat] de-duplicate on task level
138f9b3 [CodingCat] make MIMA happy
67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator
(cherry picked from commit 5af53ada65f62e6b5987eada288fb48e9211ef9d)
Signed-off-by: Matei Zaharia <matei@databricks.com>
Diffstat (limited to 'core')
3 files changed, 61 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 2301caafb0..d8817ac03d 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} +import java.util.concurrent.atomic.AtomicLong import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa */ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String]) extends Accumulable[T,T](initialValue, param, name) { + def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None) } @@ -252,7 +254,7 @@ private object Accumulators { val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() var lastId: Long = 0 - def newId: Long = synchronized { + def newId(): Long = synchronized { lastId += 1 lastId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b1222af662..cb8ccfbdbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -449,7 +449,6 @@ class DAGScheduler( } // data structures based on StageId stageIdToStage -= stageId - logDebug("After removal of stage %d, remaining stages = %d" .format(stageId, stageIdToStage.size)) } @@ -902,6 +901,34 @@ class DAGScheduler( } } + /** Merge updates from a task to our local accumulator values */ + private def updateAccumulators(event: CompletionEvent): Unit = { + val task = event.task + val stage = stageIdToStage(task.stageId) + if (event.accumUpdates != null) { + try { + Accumulators.add(event.accumUpdates) + event.accumUpdates.foreach { case (id, partialValue) => + val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]] + // To avoid UI cruft, ignore cases where value wasn't updated + if (acc.name.isDefined && partialValue != acc.zero) { + val name = acc.name.get + val stringPartialValue = Accumulators.stringifyPartialValue(partialValue) + val stringValue = Accumulators.stringifyValue(acc.value) + stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue) + event.taskInfo.accumulables += + AccumulableInfo(id, name, Some(stringPartialValue), stringValue) + } + } + } catch { + // If we see an exception during accumulator update, just log the + // error and move on. + case e: Exception => + logError(s"Failed to update accumulators for $task", e) + } + } + } + /** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. @@ -942,27 +969,6 @@ class DAGScheduler( } event.reason match { case Success => - if (event.accumUpdates != null) { - try { - Accumulators.add(event.accumUpdates) - event.accumUpdates.foreach { case (id, partialValue) => - val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]] - // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && partialValue != acc.zero) { - val name = acc.name.get - val stringPartialValue = Accumulators.stringifyPartialValue(partialValue) - val stringValue = Accumulators.stringifyValue(acc.value) - stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue) - event.taskInfo.accumulables += - AccumulableInfo(id, name, Some(stringPartialValue), stringValue) - } - } - } catch { - // If we see an exception during accumulator update, just log the error and move on. - case e: Exception => - logError(s"Failed to update accumulators for $task", e) - } - } listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task @@ -971,6 +977,7 @@ class DAGScheduler( stage.resultOfJob match { case Some(job) => if (!job.finished(rt.outputId)) { + updateAccumulators(event) job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it @@ -995,6 +1002,7 @@ class DAGScheduler( } case smt: ShuffleMapTask => + updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) @@ -1083,7 +1091,6 @@ class DAGScheduler( } failedStages += failedStage failedStages += mapStage - // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 819f95634b..bdd721dc7e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null)) + } + } + } + + private def completeWithAccumulator(accumId: Long, taskSet: TaskSet, + results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, + Map[Long, Any]((accumId, 1)), null, null)) } } } @@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) - val noAccum = Map[Long, Any]() val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assert(scheduler.sc.dagScheduler === null) } + test("accumulator not calculated for resubmitted result stage") { + //just for register + val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val finalRdd = new MyRDD(sc, 1, Nil) + submit(finalRdd, Array(0)) + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(Accumulators.originals(accum.id).value === 1) + assertDataStructuresEmpty + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. |