aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-30 11:50:22 -0700
committerReynold Xin <rxin@apache.org>2014-06-30 11:50:22 -0700
commit358ae1534d01ad9e69364a21441a7ef23c2cb516 (patch)
tree1bb1c51a40d666c6f68fb6d5abf2d976db03a1b9 /core
parent6803642253b45c80ef8970606e350adabe4211ea (diff)
downloadspark-358ae1534d01ad9e69364a21441a7ef23c2cb516.tar.gz
spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.tar.bz2
spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.zip
[SPARK-2322] Exception in resultHandler should NOT crash DAGScheduler and shutdown SparkContext.
This should go into 1.0.1. Author: Reynold Xin <rxin@apache.org> Closes #1264 from rxin/SPARK-2322 and squashes the following commits: c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator. 5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkException.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala56
3 files changed, 78 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4351ed74b6..2ebd7a7151 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable)
def this(message: String) = this(message, null)
}
+
+/**
+ * Exception thrown when execution of some user code in the driver process fails, e.g.
+ * accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation
+ * that can be misbehaving.
+ */
+private[spark] class SparkDriverExecutionException(cause: Throwable)
+ extends SparkException("Execution error", cause)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c8559a7a82..813a9abfaf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -581,8 +581,9 @@ class DAGScheduler(
}
} catch {
case e: Exception =>
- jobResult = JobFailed(e)
- job.listener.jobFailed(e)
+ val exception = new SparkDriverExecutionException(e)
+ jobResult = JobFailed(exception)
+ job.listener.jobFailed(exception)
case oom: OutOfMemoryError =>
val exception = new SparkException("Local job aborted due to out of memory error", oom)
jobResult = JobFailed(exception)
@@ -822,6 +823,7 @@ class DAGScheduler(
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
+ // TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage) -= task
@@ -838,7 +840,16 @@ class DAGScheduler(
cleanupStateForJobAndIndependentStages(job, Some(stage))
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
- job.listener.taskSucceeded(rt.outputId, event.result)
+
+ // taskSucceeded runs some user code that might throw an exception. Make sure
+ // we are resilient against that.
+ try {
+ job.listener.taskSucceeded(rt.outputId, event.result)
+ } catch {
+ case e: Exception =>
+ // TODO: Perhaps we want to mark the stage as failed?
+ job.listener.jobFailed(new SparkDriverExecutionException(e))
+ }
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -1161,8 +1172,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
override val supervisorStrategy =
OneForOneStrategy() {
case x: Exception =>
- logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
- .format(x.getMessage))
+ logError("eventProcesserActor failed; shutting down SparkContext", x)
try {
dagScheduler.doCancelAllJobs()
} catch {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8dd2a9b9f7..9f498d579a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import scala.Tuple2
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.language.reflectiveCalls
@@ -38,6 +37,8 @@ class BuggyDAGEventProcessActor extends Actor {
}
}
+class DAGSchedulerSuiteDummyException extends Exception
+
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {
@@ -593,6 +594,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ // TODO: Fix this and un-ignore the test.
+ ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+ val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
+ override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
+ override def zero(initialValue: Int): Int = 0
+ override def addInPlace(r1: Int, r2: Int): Int = {
+ throw new DAGSchedulerSuiteDummyException
+ }
+ })
+
+ // Run this on executors
+ intercept[SparkDriverExecutionException] {
+ sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
+ }
+
+ // Run this within a local thread
+ intercept[SparkDriverExecutionException] {
+ sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
+ }
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
+ test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
+ val e1 = intercept[SparkDriverExecutionException] {
+ val rdd = sc.parallelize(1 to 10, 2)
+ sc.runJob[Int, Int](
+ rdd,
+ (context: TaskContext, iter: Iterator[Int]) => iter.size,
+ Seq(0),
+ allowLocal = true,
+ (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+ }
+ assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+ val e2 = intercept[SparkDriverExecutionException] {
+ val rdd = sc.parallelize(1 to 10, 2)
+ sc.runJob[Int, Int](
+ rdd,
+ (context: TaskContext, iter: Iterator[Int]) => iter.size,
+ Seq(0, 1),
+ allowLocal = false,
+ (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+ }
+ assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
val actorSystem = ActorSystem("test")
val supervisor = actorSystem.actorOf(