aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala129
2 files changed, 76 insertions, 65 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index acb4c4946e..00b8af27a7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -710,7 +710,6 @@ class DAGScheduler(
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
- runningStages += stage
} else {
for (parent <- missing) {
submitStage(parent)
@@ -753,11 +752,14 @@ class DAGScheduler(
null
}
- // must be run listener before possible NotSerializableException
- // should be "StageSubmitted" first and then "JobEnded"
- listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
-
if (tasks.size > 0) {
+ runningStages += stage
+ // SparkListenerStageSubmitted should be posted before testing whether tasks are
+ // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
+ // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
+ // event.
+ listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
+
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
// down the road, where we have several different implementations for local scheduler and
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9f498d579a..44dd1e092a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -37,6 +37,29 @@ class BuggyDAGEventProcessActor extends Actor {
}
}
+/**
+ * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
+ * preferredLocations (if any) that are passed to them. They are deliberately not executable
+ * so we can test that DAGScheduler does not try to execute RDDs locally.
+ */
+class MyRDD(
+ sc: SparkContext,
+ numPartitions: Int,
+ dependencies: List[Dependency[_]],
+ locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+ throw new RuntimeException("should not be reached")
+ override def getPartitions = (0 until numPartitions).map(i => new Partition {
+ override def index = i
+ }).toArray
+ override def getPreferredLocations(split: Partition): Seq[String] =
+ if (locations.isDefinedAt(split.index))
+ locations(split.index)
+ else
+ Nil
+ override def toString: String = "DAGSchedulerSuiteRDD " + id
+}
+
class DAGSchedulerSuiteDummyException extends Exception
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
@@ -148,34 +171,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
* Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
* This is a pair RDD type so it can always be used in ShuffleDependencies.
*/
- type MyRDD = RDD[(Int, Int)]
-
- /**
- * Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and
- * preferredLocations (if any) that are passed to them. They are deliberately not executable
- * so we can test that DAGScheduler does not try to execute RDDs locally.
- */
- private def makeRdd(
- numPartitions: Int,
- dependencies: List[Dependency[_]],
- locations: Seq[Seq[String]] = Nil
- ): MyRDD = {
- val maxPartition = numPartitions - 1
- val newRDD = new MyRDD(sc, dependencies) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- throw new RuntimeException("should not be reached")
- override def getPartitions = (0 to maxPartition).map(i => new Partition {
- override def index = i
- }).toArray
- override def getPreferredLocations(split: Partition): Seq[String] =
- if (locations.isDefinedAt(split.index))
- locations(split.index)
- else
- Nil
- override def toString: String = "DAGSchedulerSuiteRDD " + id
- }
- newRDD
- }
+ type PairOfIntsRDD = RDD[(Int, Int)]
/**
* Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
@@ -234,19 +230,19 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
override def jobFailed(exception: Exception) = throw exception
}
- submit(makeRdd(0, Nil), Array(), listener = fakeListener)
+ submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
assert(numResults === 0)
}
test("run trivial job") {
- submit(makeRdd(1, Nil), Array(0))
+ submit(new MyRDD(sc, 1, Nil), Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}
test("local job") {
- val rdd = new MyRDD(sc, Nil) {
+ val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
Array(42 -> 0).iterator
override def getPartitions = Array( new Partition { override def index = 0 } )
@@ -260,7 +256,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("local job oom") {
- val rdd = new MyRDD(sc, Nil) {
+ val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new java.lang.OutOfMemoryError("test local job oom")
override def getPartitions = Array( new Partition { override def index = 0 } )
@@ -274,8 +270,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("run trivial job w/ dependency") {
- val baseRdd = makeRdd(1, Nil)
- val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
+ val baseRdd = new MyRDD(sc, 1, Nil)
+ val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
@@ -283,8 +279,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("cache location preferences w/ dependency") {
- val baseRdd = makeRdd(1, Nil)
- val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
+ val baseRdd = new MyRDD(sc, 1, Nil)
+ val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
cacheLocations(baseRdd.id -> 0) =
Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
submit(finalRdd, Array(0))
@@ -295,8 +291,22 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("unserializable task") {
+ val unserializableRdd = new MyRDD(sc, 1, Nil) {
+ class UnserializableClass
+ val unserializable = new UnserializableClass
+ }
+ submit(unserializableRdd, Array(0))
+ assert(failure.getMessage.startsWith(
+ "Job aborted due to stage failure: Task not serializable:"))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.contains(0))
+ assert(sparkListener.failedStages.size === 1)
+ assertDataStructuresEmpty
+ }
+
test("trivial job failure") {
- submit(makeRdd(1, Nil), Array(0))
+ submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
@@ -306,7 +316,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("trivial job cancellation") {
- val rdd = makeRdd(1, Nil)
+ val rdd = new MyRDD(sc, 1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
@@ -347,8 +357,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
- val rdd = makeRdd(1, Nil)
- val jobId = submit(rdd, Array(0))
+ val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
cancel(jobId)
// Because the job wasn't actually cancelled, we shouldn't have received a failure message.
assert(failure === null)
@@ -364,10 +373,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("run trivial shuffle") {
- val shuffleMapRdd = makeRdd(2, Nil)
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(1, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
@@ -380,10 +389,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("run trivial shuffle with fetch failure") {
- val shuffleMapRdd = makeRdd(2, Nil)
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
@@ -406,10 +415,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("ignore late map task completions") {
- val shuffleMapRdd = makeRdd(2, Nil)
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
@@ -435,9 +444,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("run shuffle with map stage failure") {
- val shuffleMapRdd = makeRdd(2, Nil)
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
- val reduceRdd = makeRdd(2, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
// Fail the map stage. This should cause the entire job to fail.
@@ -472,13 +481,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
* without shuffleMapRdd1.
*/
test("failure of stage used by two jobs") {
- val shuffleMapRdd1 = makeRdd(2, Nil)
+ val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, null)
- val shuffleMapRdd2 = makeRdd(2, Nil)
+ val shuffleMapRdd2 = new MyRDD(sc, 2, Nil)
val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, null)
- val reduceRdd1 = makeRdd(2, List(shuffleDep1))
- val reduceRdd2 = makeRdd(2, List(shuffleDep1, shuffleDep2))
+ val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1))
+ val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2))
// We need to make our own listeners for this test, since by default submit uses the same
// listener for all jobs, and here we want to capture the failure for each job separately.
@@ -511,10 +520,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("run trivial shuffle with out-of-band failure and retry") {
- val shuffleMapRdd = makeRdd(2, Nil)
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
- val reduceRdd = makeRdd(1, List(shuffleDep))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
@@ -534,11 +543,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("recursive shuffle failures") {
- val shuffleOneRdd = makeRdd(2, Nil)
+ val shuffleOneRdd = new MyRDD(sc, 2, Nil)
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = makeRdd(1, List(shuffleDepTwo))
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
// have the first stage complete normally
complete(taskSets(0), Seq(
@@ -563,11 +572,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
test("cached post-shuffle") {
- val shuffleOneRdd = makeRdd(2, Nil)
+ val shuffleOneRdd = new MyRDD(sc, 2, Nil)
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
- val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne))
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
- val finalRdd = makeRdd(1, List(shuffleDepTwo))
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))