aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala17
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 7584ae79fc..21487bc24d 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -171,11 +171,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}
- ignore("two jobs sharing the same stage") {
+ test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
- // sem2: make sure the first stage is not finished until cancel is issued
+ // twoJobsSharingStageSemaphore:
+ // make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)
- val sem2 = new Semaphore(0)
sc = new SparkContext("local[2]", "test")
sc.addSparkListener(new SparkListener {
@@ -186,7 +186,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
// Create two actions that would share the some stages.
val rdd = sc.parallelize(1 to 10, 2).map { i =>
- sem2.acquire()
+ JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
}.reduceByKey(_+_)
val f1 = rdd.collectAsync()
@@ -196,13 +196,13 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
future {
sem1.acquire()
f1.cancel()
- sem2.release(10)
+ JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
}
- // Expect both to fail now.
- // TODO: update this test when we change Spark so cancelling f1 wouldn't affect f2.
+ // Expect f1 to fail due to cancellation,
intercept[SparkException] { f1.get() }
- intercept[SparkException] { f2.get() }
+ // but f2 should not be affected
+ f2.get()
}
def testCount() {
@@ -268,4 +268,5 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
object JobCancellationSuite {
val taskStartedSemaphore = new Semaphore(0)
val taskCancelledSemaphore = new Semaphore(0)
+ val twoJobsSharingStageSemaphore = new Semaphore(0)
}