aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-04-29 13:31:52 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:31:52 -0700
commit3a180c19a4ef165366186e23d8e8844c5baaecdd (patch)
tree6c9b3750cd4369daea5ad5d30e75b5cdf1252d1a /core/src/test
parenta9c4e29950a14e32acaac547e9a0e8879fd37fc9 (diff)
downloadspark-3a180c19a4ef165366186e23d8e8844c5baaecdd.tar.gz
spark-3a180c19a4ef165366186e23d8e8844c5baaecdd.tar.bz2
spark-3a180c19a4ef165366186e23d8e8844c5baaecdd.zip
[SPARK-6629] cancelJobGroup() may not work for jobs whose job groups are inherited from parent threads
When a job is submitted with a job group and that job group is inherited from a parent thread, there are multiple bugs that may prevent this job from being cancelable via `SparkContext.cancelJobGroup()`: - When filtering jobs based on their job group properties, DAGScheduler calls `get()` instead of `getProperty()`, which does not respect inheritance, so it will skip over jobs whose job group properties were inherited. - `Properties` objects are mutable, but we do not make defensive copies / snapshots, so modifications of the parent thread's job group will cause running jobs' groups to change; this also breaks cancelation. Both of these issues are easy to fix: use `getProperty()` and perform defensive copying. Author: Josh Rosen <joshrosen@databricks.com> Closes #5288 from JoshRosen/localProperties-mutability-race and squashes the following commits: 9e29654 [Josh Rosen] Fix style issue 5d90750 [Josh Rosen] Merge remote-tracking branch 'origin/master' into localProperties-mutability-race 3f7b9e8 [Josh Rosen] Add JIRA reference; move clone into DAGScheduler 707e417 [Josh Rosen] Clone local properties to prevent mutations from breaking job cancellation. b376114 [Josh Rosen] Fix bug that prevented jobs with inherited job group properties from being cancelled.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala46
2 files changed, 80 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 4d3e09793f..ae17fc60e4 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}
+ test("inherited job group (SPARK-6629)") {
+ sc = new SparkContext("local[2]", "test")
+
+ // Add a listener to release the semaphore once any tasks are launched.
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ sem.release()
+ }
+ })
+
+ sc.setJobGroup("jobA", "this is a job to be cancelled")
+ @volatile var exception: Exception = null
+ val jobA = new Thread() {
+ // The job group should be inherited by this thread
+ override def run(): Unit = {
+ exception = intercept[SparkException] {
+ sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
+ }
+ }
+ }
+ jobA.start()
+
+ // Block until both tasks of job A have started and cancel job A.
+ sem.acquire(2)
+ sc.cancelJobGroup("jobA")
+ jobA.join(10000)
+ assert(!jobA.isAlive)
+ assert(exception.getMessage contains "cancel")
+
+ // Once A is cancelled, job B should finish fairly quickly.
+ val jobB = sc.parallelize(1 to 100, 2).countAsync()
+ assert(jobB.get() === 100)
+ }
+
test("job group with interruption") {
sc = new SparkContext("local[2]", "test")
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index b5383d553a..10917c866c 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -17,10 +17,11 @@
package org.apache.spark
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
+import org.apache.spark.scheduler._
import org.scalatest.FunSuite
/**
@@ -189,4 +190,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}
+
+ test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
+ val jobStarted = new Semaphore(0)
+ val jobEnded = new Semaphore(0)
+ @volatile var jobResult: JobResult = null
+
+ sc = new SparkContext("local", "test")
+ sc.setJobGroup("originalJobGroupId", "description")
+ sc.addSparkListener(new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobStarted.release()
+ }
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ jobResult = jobEnd.jobResult
+ jobEnded.release()
+ }
+ })
+
+ // Create a new thread which will inherit the current thread's properties
+ val thread = new Thread() {
+ override def run(): Unit = {
+ assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
+ // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
+ try {
+ sc.parallelize(1 to 100).foreach { x =>
+ Thread.sleep(100)
+ }
+ } catch {
+ case s: SparkException => // ignored so that we don't print noise in test logs
+ }
+ }
+ }
+ thread.start()
+ // Wait for the job to start, then mutate the original properties, which should have been
+ // inherited by the running job but hopefully defensively copied or snapshotted:
+ jobStarted.tryAcquire(10, TimeUnit.SECONDS)
+ sc.setJobGroup("modifiedJobGroupId", "description")
+ // Canceling the original job group should cancel the running job. In other words, the
+ // modification of the properties object should not affect the properties of running jobs
+ sc.cancelJobGroup("originalJobGroupId")
+ jobEnded.tryAcquire(10, TimeUnit.SECONDS)
+ assert(jobResult.isInstanceOf[JobFailed])
+ }
}