aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-01-26 14:32:27 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-26 14:32:27 -0800
commit0497ea51ac345f8057d222a18dbbf8eae78f5b92 (patch)
tree437fcf5c97bbbb55697c40c2976af5102c3ed668 /core
parentb38034e878546a12c6d52f17fc961fd1a2453b97 (diff)
downloadspark-0497ea51ac345f8057d222a18dbbf8eae78f5b92.tar.gz
spark-0497ea51ac345f8057d222a18dbbf8eae78f5b92.tar.bz2
spark-0497ea51ac345f8057d222a18dbbf8eae78f5b92.zip
SPARK-960 [CORE] [TEST] JobCancellationSuite "two jobs sharing the same stage" is broken
This reenables and fixes this test, after addressing two issues: - The Semaphore that was intended to be shared locally was being serialized and copied; it's now a static member in the companion object as in other tests - Later changes to Spark means that cancelling the first task will not cancel the shared stage and therefore the second task should succeed Author: Sean Owen <sowen@cloudera.com> Closes #4180 from srowen/SPARK-960 and squashes the following commits: 43da66f [Sean Owen] Fix 'two jobs sharing the same stage' test and reenable it: truly share a Semaphore locally as intended, and update expectation of failure in non-cancelled task
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala17
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 7584ae79fc..21487bc24d 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -171,11 +171,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}
- ignore("two jobs sharing the same stage") {
+ test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
- // sem2: make sure the first stage is not finished until cancel is issued
+ // twoJobsSharingStageSemaphore:
+ // make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)
- val sem2 = new Semaphore(0)
sc = new SparkContext("local[2]", "test")
sc.addSparkListener(new SparkListener {
@@ -186,7 +186,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
// Create two actions that would share the some stages.
val rdd = sc.parallelize(1 to 10, 2).map { i =>
- sem2.acquire()
+ JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
}.reduceByKey(_+_)
val f1 = rdd.collectAsync()
@@ -196,13 +196,13 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
future {
sem1.acquire()
f1.cancel()
- sem2.release(10)
+ JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
}
- // Expect both to fail now.
- // TODO: update this test when we change Spark so cancelling f1 wouldn't affect f2.
+ // Expect f1 to fail due to cancellation,
intercept[SparkException] { f1.get() }
- intercept[SparkException] { f2.get() }
+ // but f2 should not be affected
+ f2.get()
}
def testCount() {
@@ -268,4 +268,5 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
object JobCancellationSuite {
val taskStartedSemaphore = new Semaphore(0)
val taskCancelledSemaphore = new Semaphore(0)
+ val twoJobsSharingStageSemaphore = new Semaphore(0)
}