aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-11-26 16:52:04 -0800
committerMatei Zaharia <matei@databricks.com>2014-11-26 16:52:04 -0800
commit5af53ada65f62e6b5987eada288fb48e9211ef9d (patch)
tree41656128e02df28f5add223233442a2393cf4dc4 /core
parent561d31d2f13cc7b1112ba9f9aa8f08bcd032aebb (diff)
downloadspark-5af53ada65f62e6b5987eada288fb48e9211ef9d.tar.gz
spark-5af53ada65f62e6b5987eada288fb48e9211ef9d.tar.bz2
spark-5af53ada65f62e6b5987eada288fb48e9211ef9d.zip
[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator
https://issues.apache.org/jira/browse/SPARK-3628 In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive In this patch, I changed the way for the DAGScheduler to update the accumulator, DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt... Author: CodingCat <zhunansjtu@gmail.com> Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits: 701a1e8 [CodingCat] roll back change on Accumulator.scala 1433e6f [CodingCat] make MIMA happy b233737 [CodingCat] address Matei's comments 02261b8 [CodingCat] rollback some changes 6b0aff9 [CodingCat] update document 2b2e8cf [CodingCat] updateAccumulator 83b75f8 [CodingCat] style fix 84570d2 [CodingCat] re-enable the bad accumulator guard 1e9e14d [CodingCat] add NPE guard 21b6840 [CodingCat] simplify the patch 88d1f03 [CodingCat] fix rebase error f74266b [CodingCat] add test case for resubmitted result stage 5cf586f [CodingCat] de-duplicate on task level 138f9b3 [CodingCat] make MIMA happy 67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala53
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala34
3 files changed, 61 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index dc1e8f6c21..000bbd6b53 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io.{ObjectInputStream, Serializable}
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.generic.Growable
import scala.collection.mutable.Map
@@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}
@@ -282,7 +284,7 @@ private object Accumulators {
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0
- def newId: Long = synchronized {
+ def newId(): Long = synchronized {
lastId += 1
lastId
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b1222af662..cb8ccfbdbd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -449,7 +449,6 @@ class DAGScheduler(
}
// data structures based on StageId
stageIdToStage -= stageId
-
logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
@@ -902,6 +901,34 @@ class DAGScheduler(
}
}
+ /** Merge updates from a task to our local accumulator values */
+ private def updateAccumulators(event: CompletionEvent): Unit = {
+ val task = event.task
+ val stage = stageIdToStage(task.stageId)
+ if (event.accumUpdates != null) {
+ try {
+ Accumulators.add(event.accumUpdates)
+ event.accumUpdates.foreach { case (id, partialValue) =>
+ val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
+ // To avoid UI cruft, ignore cases where value wasn't updated
+ if (acc.name.isDefined && partialValue != acc.zero) {
+ val name = acc.name.get
+ val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
+ val stringValue = Accumulators.stringifyValue(acc.value)
+ stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
+ event.taskInfo.accumulables +=
+ AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
+ }
+ }
+ } catch {
+ // If we see an exception during accumulator update, just log the
+ // error and move on.
+ case e: Exception =>
+ logError(s"Failed to update accumulators for $task", e)
+ }
+ }
+ }
+
/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -942,27 +969,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
- if (event.accumUpdates != null) {
- try {
- Accumulators.add(event.accumUpdates)
- event.accumUpdates.foreach { case (id, partialValue) =>
- val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
- // To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && partialValue != acc.zero) {
- val name = acc.name.get
- val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
- val stringValue = Accumulators.stringifyValue(acc.value)
- stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
- event.taskInfo.accumulables +=
- AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
- }
- }
- } catch {
- // If we see an exception during accumulator update, just log the error and move on.
- case e: Exception =>
- logError(s"Failed to update accumulators for $task", e)
- }
- }
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
@@ -971,6 +977,7 @@ class DAGScheduler(
stage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
+ updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
@@ -995,6 +1002,7 @@ class DAGScheduler(
}
case smt: ShuffleMapTask =>
+ updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
@@ -1083,7 +1091,6 @@ class DAGScheduler(
}
failedStages += failedStage
failedStages += mapStage
-
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 819f95634b..bdd721dc7e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
+ runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
+ }
+ }
+ }
+
+ private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
+ results: Seq[(TaskEndReason, Any)]) {
+ assert(taskSet.tasks.size >= results.size)
+ for ((result, i) <- results.zipWithIndex) {
+ if (i < taskSet.tasks.size) {
+ runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
+ Map[Long, Any]((accumId, 1)), null, null))
}
}
}
@@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
- val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a non-failed host
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(scheduler.sc.dagScheduler === null)
}
+ test("accumulator not calculated for resubmitted result stage") {
+ //just for register
+ val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
+ val finalRdd = new MyRDD(sc, 1, Nil)
+ submit(finalRdd, Array(0))
+ completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+ completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assert(Accumulators.originals(accum.id).value === 1)
+ assertDataStructuresEmpty
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.