aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala53
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala34
-rw-r--r--docs/programming-guide.md6
4 files changed, 67 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index dc1e8f6c21..000bbd6b53 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io.{ObjectInputStream, Serializable}
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.generic.Growable
import scala.collection.mutable.Map
@@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}
@@ -282,7 +284,7 @@ private object Accumulators {
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0
- def newId: Long = synchronized {
+ def newId(): Long = synchronized {
lastId += 1
lastId
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b1222af662..cb8ccfbdbd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -449,7 +449,6 @@ class DAGScheduler(
}
// data structures based on StageId
stageIdToStage -= stageId
-
logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
@@ -902,6 +901,34 @@ class DAGScheduler(
}
}
+ /** Merge updates from a task to our local accumulator values */
+ private def updateAccumulators(event: CompletionEvent): Unit = {
+ val task = event.task
+ val stage = stageIdToStage(task.stageId)
+ if (event.accumUpdates != null) {
+ try {
+ Accumulators.add(event.accumUpdates)
+ event.accumUpdates.foreach { case (id, partialValue) =>
+ val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
+ // To avoid UI cruft, ignore cases where value wasn't updated
+ if (acc.name.isDefined && partialValue != acc.zero) {
+ val name = acc.name.get
+ val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
+ val stringValue = Accumulators.stringifyValue(acc.value)
+ stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
+ event.taskInfo.accumulables +=
+ AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
+ }
+ }
+ } catch {
+ // If we see an exception during accumulator update, just log the
+ // error and move on.
+ case e: Exception =>
+ logError(s"Failed to update accumulators for $task", e)
+ }
+ }
+ }
+
/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -942,27 +969,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
- if (event.accumUpdates != null) {
- try {
- Accumulators.add(event.accumUpdates)
- event.accumUpdates.foreach { case (id, partialValue) =>
- val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
- // To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && partialValue != acc.zero) {
- val name = acc.name.get
- val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
- val stringValue = Accumulators.stringifyValue(acc.value)
- stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
- event.taskInfo.accumulables +=
- AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
- }
- }
- } catch {
- // If we see an exception during accumulator update, just log the error and move on.
- case e: Exception =>
- logError(s"Failed to update accumulators for $task", e)
- }
- }
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
@@ -971,6 +977,7 @@ class DAGScheduler(
stage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
+ updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
@@ -995,6 +1002,7 @@ class DAGScheduler(
}
case smt: ShuffleMapTask =>
+ updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
@@ -1083,7 +1091,6 @@ class DAGScheduler(
}
failedStages += failedStage
failedStages += mapStage
-
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 819f95634b..bdd721dc7e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
+ runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
+ }
+ }
+ }
+
+ private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
+ results: Seq[(TaskEndReason, Any)]) {
+ assert(taskSet.tasks.size >= results.size)
+ for ((result, i) <- results.zipWithIndex) {
+ if (i < taskSet.tasks.size) {
+ runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
+ Map[Long, Any]((accumId, 1)), null, null))
}
}
}
@@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
- val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a non-failed host
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+ runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assert(scheduler.sc.dagScheduler === null)
}
+ test("accumulator not calculated for resubmitted result stage") {
+ //just for register
+ val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
+ val finalRdd = new MyRDD(sc, 1, Nil)
+ submit(finalRdd, Array(0))
+ completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+ completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assert(Accumulators.originals(accum.id).value === 1)
+ assertDataStructuresEmpty
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 49f319ba77..c60de6e970 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1306,6 +1306,12 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
</div>
+For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator
+will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware
+of that each task's update may be applied more than once if tasks or job stages are re-executed.
+
+
+
# Deploying to a Cluster
The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster.