From 5af53ada65f62e6b5987eada288fb48e9211ef9d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 26 Nov 2014 16:52:04 -0800 Subject: [SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator https://issues.apache.org/jira/browse/SPARK-3628 In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive In this patch, I changed the way for the DAGScheduler to update the accumulator, DAGScheduler maintains a HashTable, mapping the stage id to the received pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt... Author: CodingCat Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits: 701a1e8 [CodingCat] roll back change on Accumulator.scala 1433e6f [CodingCat] make MIMA happy b233737 [CodingCat] address Matei's comments 02261b8 [CodingCat] rollback some changes 6b0aff9 [CodingCat] update document 2b2e8cf [CodingCat] updateAccumulator 83b75f8 [CodingCat] style fix 84570d2 [CodingCat] re-enable the bad accumulator guard 1e9e14d [CodingCat] add NPE guard 21b6840 [CodingCat] simplify the patch 88d1f03 [CodingCat] fix rebase error f74266b [CodingCat] add test case for resubmitted result stage 5cf586f [CodingCat] de-duplicate on task level 138f9b3 [CodingCat] make MIMA happy 67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator --- docs/programming-guide.md | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'docs/programming-guide.md') diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 49f319ba77..c60de6e970 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1306,6 +1306,12 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam()) +For accumulator updates performed inside actions only, Spark guarantees that each task's update to the accumulator +will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware +of that each task's update may be applied more than once if tasks or job stages are re-executed. + + + # Deploying to a Cluster The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster. -- cgit v1.2.3