diff options
author | Reynold Xin <rxin@apache.org> | 2014-08-04 20:39:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-08-04 20:39:18 -0700 |
commit | 05bf4e4aff0d052a53d3e64c43688f07e27fec50 (patch) | |
tree | 6b15e9154b3200672f3190700178a99a0fdce2cb /core | |
parent | 9fd82dbbcb8b10debbe95f1acab53ae8b340f38e (diff) | |
download | spark-05bf4e4aff0d052a53d3e64c43688f07e27fec50.tar.gz spark-05bf4e4aff0d052a53d3e64c43688f07e27fec50.tar.bz2 spark-05bf4e4aff0d052a53d3e64c43688f07e27fec50.zip |
[SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext
Author: Reynold Xin <rxin@apache.org>
Closes #1772 from rxin/accumulator-dagscheduler and squashes the following commits:
6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 11 |
2 files changed, 10 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d87c304898..9fa3a4e9c7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -904,8 +904,13 @@ class DAGScheduler( event.reason match { case Success => if (event.accumUpdates != null) { - // TODO: fail the stage if the accumulator update fails... - Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted + try { + Accumulators.add(event.accumUpdates) + } catch { + // If we see an exception during accumulator update, just log the error and move on. + case e: Exception => + logError(s"Failed to update accumulators for $task", e) + } } stage.pendingTasks -= task task match { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 36e238b4c9..8c1b0fed11 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } - // TODO: Fix this and un-ignore the test. - ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") { + test("misbehaved accumulator should not crash DAGScheduler and SparkContext") { val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 override def zero(initialValue: Int): Int = 0 @@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F }) // Run this on executors - intercept[SparkDriverExecutionException] { - sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } - } + sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } // Run this within a local thread - intercept[SparkDriverExecutionException] { - sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) - } + sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) // Make sure we can still run local commands as well as cluster commands. assert(sc.parallelize(1 to 10, 2).count() === 10) |