aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-04-23 22:00:22 -0700
committerReynold Xin <rxin@apache.org>2014-04-23 22:00:22 -0700
commitd485eecb7233dd339ae85a6f58f1c0686dd2037d (patch)
tree0b2d0594d076536833f2d080cfe4330aae12f6a1 /core
parent4b2bab1d08a6b790be94717bbdd643d896d85c16 (diff)
downloadspark-d485eecb7233dd339ae85a6f58f1c0686dd2037d.tar.gz
spark-d485eecb7233dd339ae85a6f58f1c0686dd2037d.tar.bz2
spark-d485eecb7233dd339ae85a6f58f1c0686dd2037d.zip
Update Java api for setJobGroup with interruptOnCancel
Also adds a unit test. Author: Aaron Davidson <aaron@databricks.com> Closes #522 from aarondav/cancel2 and squashes the following commits: 565c253 [Aaron Davidson] Update Java api for setJobGroup with interruptOnCancel 65b33d8 [Aaron Davidson] Add unit test for Thread interruption on cancellation
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala36
2 files changed, 49 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index bda9272b43..8b95cda511 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel");
* }}}
+ *
+ * If interruptOnCancel is set to true for the job group, then job cancellation will result
+ * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+ * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+ * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+ */
+ def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit =
+ sc.setJobGroup(groupId, description, interruptOnCancel)
+
+ /**
+ * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+ * different value or cleared.
+ *
+ * @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`.
+ * This method sets interruptOnCancel to false.
*/
def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 7a39d1af9e..16cfdf11c4 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
import scala.concurrent.future
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -101,18 +101,50 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}
+ // Block until both tasks of job A have started and cancel job A.
+ sem.acquire(2)
+
sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
+ sc.cancelJobGroup("jobA")
+ val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
+ assert(e.getMessage contains "cancel")
+
+ // Once A is cancelled, job B should finish fairly quickly.
+ assert(jobB.get() === 100)
+ }
+
+
+ test("job group with interruption") {
+ sc = new SparkContext("local[2]", "test")
+
+ // Add a listener to release the semaphore once any tasks are launched.
+ val sem = new Semaphore(0)
+ sc.addSparkListener(new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ sem.release()
+ }
+ })
+
+ // jobA is the one to be cancelled.
+ val jobA = future {
+ sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
+ sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
+ }
// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)
+
+ sc.clearJobGroup()
+ val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
- val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
+ val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}
+
/*
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched