aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala107
2 files changed, 109 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77a184dfe4..e01a9609b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -946,7 +946,9 @@ class DAGScheduler(
stage.resetInternalAccumulators()
}
- val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
+ // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
+ // with this Stage
+ val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -1047,7 +1049,7 @@ class DAGScheduler(
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
+ tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4d6b254552..653d41fc05 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import java.util.Properties
+
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
@@ -262,9 +264,10 @@ class DAGSchedulerSuite
rdd: RDD[_],
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
- listener: JobListener = jobListener): Int = {
+ listener: JobListener = jobListener,
+ properties: Properties = null): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
+ runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
jobId
}
@@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
+ assert(taskSet.properties != null)
+ assert(taskSet.properties.getProperty("testProperty") === expected)
+ assert(taskSet.priority === priority)
+ }
+
+ def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
+ val baseRdd = new MyRDD(sc, 1, Nil)
+ val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
+ val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
+ val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
+ val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
+ val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
+ val job1Properties = new Properties()
+ val job2Properties = new Properties()
+ job1Properties.setProperty("testProperty", "job1")
+ job2Properties.setProperty("testProperty", "job2")
+
+ // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
+ // Note that we have to submit job2 before we cancel job1 to have them actually share
+ // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
+ // we address SPARK-10193.)
+ val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
+ val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
+ assert(scheduler.activeJobs.nonEmpty)
+ val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
+
+ // remove job1 as an ActiveJob
+ cancel(jobId1)
+
+ // job2 should still be running
+ assert(scheduler.activeJobs.nonEmpty)
+ val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
+ assert(testProperty1 != testProperty2)
+ // NB: This next assert isn't necessarily the "desired" behavior; it's just to document
+ // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
+ // even though we have cancelled that job and are now running it because of job2, we haven't
+ // updated the TaskSet's properties. Changing the properties to "job2" is likely the more
+ // correct behavior.
+ val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
+ checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
+ complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+
+ shuffleDep1
+ }
+
+ /**
+ * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
+ * later, active job if they were previously run under a job that is no longer active
+ */
+ test("stage used by two jobs, the first no longer active (SPARK-6880)") {
+ launchJobsThatShareStageAndCancelFirst()
+
+ // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
+ // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
+ // the stage.
+ checkJobPropertiesAndPriority(taskSets(1), "job2", 1)
+
+ complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+ assert(taskSets(2).properties != null)
+ complete(taskSets(2), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assert(scheduler.activeJobs.isEmpty)
+
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
+ * later, active job if they were previously run under a job that is no longer active, even when
+ * there are fetch failures
+ */
+ test("stage used by two jobs, some fetch failures, and the first job no longer active " +
+ "(SPARK-6880)") {
+ val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
+ val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob
+
+ // lets say there is a fetch failure in this task set, which makes us go back and
+ // run stage 0, attempt 1
+ complete(taskSets(1), Seq(
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
+ scheduler.resubmitFailedStages()
+
+ // stage 0, attempt 1 should have the properties of job2
+ assert(taskSets(2).stageId === 0)
+ assert(taskSets(2).stageAttemptId === 1)
+ checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)
+
+ // run the rest of the stages normally, checking that they have the correct properties
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
+ complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
+ checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
+ complete(taskSets(4), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assert(scheduler.activeJobs.isEmpty)
+
+ assertDataStructuresEmpty()
+ }
+
test("run trivial shuffle with out-of-band failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))