aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2015-11-25 09:34:34 -0600
committerImran Rashid <irashid@cloudera.com>2015-11-25 09:34:34 -0600
commit0a5aef753e70e93d7e56054f354a52e4d4e18932 (patch)
treea079c2538fe5f91b76eb2630e351e5a99b7d37f3 /core
parentb9b6fbe89b6d1a890faa02c1a53bb670a6255362 (diff)
downloadspark-0a5aef753e70e93d7e56054f354a52e4d4e18932.tar.gz
spark-0a5aef753e70e93d7e56054f354a52e4d4e18932.tar.bz2
spark-0a5aef753e70e93d7e56054f354a52e4d4e18932.zip
[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a Stage
This issue was addressed in https://github.com/apache/spark/pull/5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880. The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks. This fix should be applied to all maintenance branches, since it has existed since 1.0. kayousterhout pankajarora12 Author: Mark Hamstra <markhamstra@gmail.com> Author: Imran Rashid <irashid@cloudera.com> Closes #6291 from markhamstra/SPARK-6880.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala107
2 files changed, 109 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77a184dfe4..e01a9609b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -946,7 +946,9 @@ class DAGScheduler(
stage.resetInternalAccumulators()
}
- val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
+ // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
+ // with this Stage
+ val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -1047,7 +1049,7 @@ class DAGScheduler(
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
+ tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4d6b254552..653d41fc05 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import java.util.Properties
+
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
@@ -262,9 +264,10 @@ class DAGSchedulerSuite
rdd: RDD[_],
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
- listener: JobListener = jobListener): Int = {
+ listener: JobListener = jobListener,
+ properties: Properties = null): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
+ runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
jobId
}
@@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
+ assert(taskSet.properties != null)
+ assert(taskSet.properties.getProperty("testProperty") === expected)
+ assert(taskSet.priority === priority)
+ }
+
+ def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
+ val baseRdd = new MyRDD(sc, 1, Nil)
+ val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
+ val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
+ val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
+ val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
+ val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
+ val job1Properties = new Properties()
+ val job2Properties = new Properties()
+ job1Properties.setProperty("testProperty", "job1")
+ job2Properties.setProperty("testProperty", "job2")
+
+ // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
+ // Note that we have to submit job2 before we cancel job1 to have them actually share
+ // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
+ // we address SPARK-10193.)
+ val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
+ val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
+ assert(scheduler.activeJobs.nonEmpty)
+ val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
+
+ // remove job1 as an ActiveJob
+ cancel(jobId1)
+
+ // job2 should still be running
+ assert(scheduler.activeJobs.nonEmpty)
+ val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
+ assert(testProperty1 != testProperty2)
+ // NB: This next assert isn't necessarily the "desired" behavior; it's just to document
+ // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
+ // even though we have cancelled that job and are now running it because of job2, we haven't
+ // updated the TaskSet's properties. Changing the properties to "job2" is likely the more
+ // correct behavior.
+ val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
+ checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
+ complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+
+ shuffleDep1
+ }
+
+ /**
+ * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
+ * later, active job if they were previously run under a job that is no longer active
+ */
+ test("stage used by two jobs, the first no longer active (SPARK-6880)") {
+ launchJobsThatShareStageAndCancelFirst()
+
+ // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
+ // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
+ // the stage.
+ checkJobPropertiesAndPriority(taskSets(1), "job2", 1)
+
+ complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+ assert(taskSets(2).properties != null)
+ complete(taskSets(2), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assert(scheduler.activeJobs.isEmpty)
+
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
+ * later, active job if they were previously run under a job that is no longer active, even when
+ * there are fetch failures
+ */
+ test("stage used by two jobs, some fetch failures, and the first job no longer active " +
+ "(SPARK-6880)") {
+ val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
+ val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob
+
+ // lets say there is a fetch failure in this task set, which makes us go back and
+ // run stage 0, attempt 1
+ complete(taskSets(1), Seq(
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
+ scheduler.resubmitFailedStages()
+
+ // stage 0, attempt 1 should have the properties of job2
+ assert(taskSets(2).stageId === 0)
+ assert(taskSets(2).stageAttemptId === 1)
+ checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)
+
+ // run the rest of the stages normally, checking that they have the correct properties
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
+ complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
+ checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
+ complete(taskSets(4), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assert(scheduler.activeJobs.isEmpty)
+
+ assertDataStructuresEmpty()
+ }
+
test("run trivial shuffle with out-of-band failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))