aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-30 11:50:22 -0700
committerReynold Xin <rxin@apache.org>2014-06-30 11:52:30 -0700
commitd468b3d748a2459fb937feb9a6a84ec5f2258dc3 (patch)
treef0b96ffc75e6e9e31eeb09af596cade020e97961
parentfebec742c82852c736fe09110667e82092f2e5f9 (diff)
downloadspark-d468b3d748a2459fb937feb9a6a84ec5f2258dc3.tar.gz
spark-d468b3d748a2459fb937feb9a6a84ec5f2258dc3.tar.bz2
spark-d468b3d748a2459fb937feb9a6a84ec5f2258dc3.zip
[SPARK-2322] Exception in resultHandler should NOT crash DAGScheduler and shutdown SparkContext.
This should go into 1.0.1. Author: Reynold Xin <rxin@apache.org> Closes #1264 from rxin/SPARK-2322 and squashes the following commits: c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator. 5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext. (cherry picked from commit 358ae1534d01ad9e69364a21441a7ef23c2cb516) Signed-off-by: Reynold Xin <rxin@apache.org> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-rw-r--r--core/src/main/scala/org/apache/spark/SparkException.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala56
3 files changed, 78 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4351ed74b6..2ebd7a7151 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable)
def this(message: String) = this(message, null)
}
+
+/**
+ * Exception thrown when execution of some user code in the driver process fails, e.g.
+ * accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation
+ * that can be misbehaving.
+ */
+private[spark] class SparkDriverExecutionException(cause: Throwable)
+ extends SparkException("Execution error", cause)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 81a87438a8..d15aaa3fc1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -578,8 +578,9 @@ class DAGScheduler(
}
} catch {
case e: Exception =>
- jobResult = JobFailed(e)
- job.listener.jobFailed(e)
+ val exception = new SparkDriverExecutionException(e)
+ jobResult = JobFailed(exception)
+ job.listener.jobFailed(exception)
} finally {
val s = job.finalStage
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
@@ -815,6 +816,7 @@ class DAGScheduler(
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
+ // TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage) -= task
@@ -831,7 +833,16 @@ class DAGScheduler(
cleanupStateForJobAndIndependentStages(job, Some(stage))
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
- job.listener.taskSucceeded(rt.outputId, event.result)
+
+ // taskSucceeded runs some user code that might throw an exception. Make sure
+ // we are resilient against that.
+ try {
+ job.listener.taskSucceeded(rt.outputId, event.result)
+ } catch {
+ case e: Exception =>
+ // TODO: Perhaps we want to mark the stage as failed?
+ job.listener.jobFailed(new SparkDriverExecutionException(e))
+ }
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -1154,8 +1165,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
override val supervisorStrategy =
OneForOneStrategy() {
case x: Exception =>
- logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
- .format(x.getMessage))
+ logError("eventProcesserActor failed; shutting down SparkContext", x)
try {
dagScheduler.doCancelAllJobs()
} catch {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 23a350be56..cea0fe11e1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import scala.Tuple2
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.language.reflectiveCalls
@@ -37,6 +36,8 @@ class BuggyDAGEventProcessActor extends Actor {
}
}
+class DAGSchedulerSuiteDummyException extends Exception
+
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {
@@ -578,6 +579,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ // TODO: Fix this and un-ignore the test.
+ ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+ val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
+ override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
+ override def zero(initialValue: Int): Int = 0
+ override def addInPlace(r1: Int, r2: Int): Int = {
+ throw new DAGSchedulerSuiteDummyException
+ }
+ })
+
+ // Run this on executors
+ intercept[SparkDriverExecutionException] {
+ sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
+ }
+
+ // Run this within a local thread
+ intercept[SparkDriverExecutionException] {
+ sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
+ }
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
+ test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
+ val e1 = intercept[SparkDriverExecutionException] {
+ val rdd = sc.parallelize(1 to 10, 2)
+ sc.runJob[Int, Int](
+ rdd,
+ (context: TaskContext, iter: Iterator[Int]) => iter.size,
+ Seq(0),
+ allowLocal = true,
+ (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+ }
+ assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+ val e2 = intercept[SparkDriverExecutionException] {
+ val rdd = sc.parallelize(1 to 10, 2)
+ sc.runJob[Int, Int](
+ rdd,
+ (context: TaskContext, iter: Iterator[Int]) => iter.size,
+ Seq(0, 1),
+ allowLocal = false,
+ (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+ }
+ assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
val actorSystem = ActorSystem("test")
val supervisor = actorSystem.actorOf(