aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/JobCancellationSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala35
1 files changed, 35 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 4d3e09793f..ae17fc60e4 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}
+ test("inherited job group (SPARK-6629)") {
+ sc = new SparkContext("local[2]", "test")
+
+ // Add a listener to release the semaphore once any tasks are launched.
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ sem.release()
+ }
+ })
+
+ sc.setJobGroup("jobA", "this is a job to be cancelled")
+ @volatile var exception: Exception = null
+ val jobA = new Thread() {
+ // The job group should be inherited by this thread
+ override def run(): Unit = {
+ exception = intercept[SparkException] {
+ sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
+ }
+ }
+ }
+ jobA.start()
+
+ // Block until both tasks of job A have started and cancel job A.
+ sem.acquire(2)
+ sc.cancelJobGroup("jobA")
+ jobA.join(10000)
+ assert(!jobA.isAlive)
+ assert(exception.getMessage contains "cancel")
+
+ // Once A is cancelled, job B should finish fairly quickly.
+ val jobB = sc.parallelize(1 to 100, 2).countAsync()
+ assert(jobB.get() === 100)
+ }
+
test("job group with interruption") {
sc = new SparkContext("local[2]", "test")