aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-04 20:39:18 -0700
committerReynold Xin <rxin@apache.org>2014-08-04 20:39:18 -0700
commit05bf4e4aff0d052a53d3e64c43688f07e27fec50 (patch)
tree6b15e9154b3200672f3190700178a99a0fdce2cb
parent9fd82dbbcb8b10debbe95f1acab53ae8b340f38e (diff)
downloadspark-05bf4e4aff0d052a53d3e64c43688f07e27fec50.tar.gz
spark-05bf4e4aff0d052a53d3e64c43688f07e27fec50.tar.bz2
spark-05bf4e4aff0d052a53d3e64c43688f07e27fec50.zip
[SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext
Author: Reynold Xin <rxin@apache.org> Closes #1772 from rxin/accumulator-dagscheduler and squashes the following commits: 6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala11
2 files changed, 10 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d87c304898..9fa3a4e9c7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -904,8 +904,13 @@ class DAGScheduler(
event.reason match {
case Success =>
if (event.accumUpdates != null) {
- // TODO: fail the stage if the accumulator update fails...
- Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
+ try {
+ Accumulators.add(event.accumUpdates)
+ } catch {
+ // If we see an exception during accumulator update, just log the error and move on.
+ case e: Exception =>
+ logError(s"Failed to update accumulators for $task", e)
+ }
}
stage.pendingTasks -= task
task match {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 36e238b4c9..8c1b0fed11 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
- // TODO: Fix this and un-ignore the test.
- ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+ test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
override def zero(initialValue: Int): Int = 0
@@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
})
// Run this on executors
- intercept[SparkDriverExecutionException] {
- sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
- }
+ sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
// Run this within a local thread
- intercept[SparkDriverExecutionException] {
- sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
- }
+ sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)