aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAhir Reddy <ahirreddy@gmail.com>2014-04-24 20:21:10 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-24 20:21:10 -0700
commite53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7 (patch)
treeb156c7c7cc8e5b3d3876e0c8268f4d979541cacc
parentee6f7e22a449837864072e3cd2b6696005f134f1 (diff)
downloadspark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.gz
spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.bz2
spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.zip
[SPARK-986]: Job cancelation for PySpark
* Additions to the PySpark API to cancel jobs * Monitor Thread in PythonRDD to kill Python workers if a task is interrupted Author: Ahir Reddy <ahirreddy@gmail.com> Closes #541 from ahirreddy/python-cancel and squashes the following commits: dfdf447 [Ahir Reddy] Changed success -> completed and made logging message clearer 6c860ab [Ahir Reddy] PR Comments 4b4100a [Ahir Reddy] Success flag adba6ed [Ahir Reddy] Destroy python workers 27a2f8f [Ahir Reddy] Start the writer thread... d422f7b [Ahir Reddy] Remove unnecesssary vals adda337 [Ahir Reddy] Busy wait on the ocntext.interrupted flag, and then kill the python worker d9e472f [Ahir Reddy] Revert "removed unnecessary vals" 5b9cae5 [Ahir Reddy] removed unnecessary vals 07b54d9 [Ahir Reddy] Fix canceling unit test 8ae9681 [Ahir Reddy] Don't interrupt worker 7722342 [Ahir Reddy] Monitor Thread for python workers db04e16 [Ahir Reddy] Added canceling api to PySpark
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala30
-rw-r--r--python/pyspark/context.py52
3 files changed, 86 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 915315ed74..bea435ec34 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -97,6 +97,14 @@ class SparkEnv (
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}
+
+ private[spark]
+ def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
+ synchronized {
+ val key = (pythonExec, envVars)
+ pythonWorkers(key).stop()
+ }
+ }
}
object SparkEnv extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d71fdbb03..1498b017a7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -110,13 +110,41 @@ private[spark] class PythonRDD[T: ClassTag](
}
}.start()
+ // Necessary to distinguish between a task that has failed and a task that is finished
+ @volatile var complete: Boolean = false
+
+ // It is necessary to have a monitor thread for python workers if the user cancels with
+ // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
+ // threads can block indefinitely.
+ new Thread(s"Worker Monitor for $pythonExec") {
+ override def run() {
+ // Kill the worker if it is interrupted or completed
+ // When a python task completes, the context is always set to interupted
+ while (!context.interrupted) {
+ Thread.sleep(2000)
+ }
+ if (!complete) {
+ try {
+ logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
+ env.destroyPythonWorker(pythonExec, envVars.toMap)
+ } catch {
+ case e: Exception =>
+ logError("Exception when trying to kill worker", e)
+ }
+ }
+ }
+ }.start()
+
/*
* Partial fix for SPARK-1019: Attempts to stop reading the input stream since
* other completion callbacks might invalidate the input. Because interruption
* is not synchronous this still leaves a potential race where the interruption is
* processed only after the stream becomes invalid.
*/
- context.addOnCompleteCallback(() => context.interrupted = true)
+ context.addOnCompleteCallback{ () =>
+ complete = true // Indicate that the task has completed successfully
+ context.interrupted = true
+ }
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index f63cc4a55f..c74dc5fd4f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -429,7 +429,7 @@ class SparkContext(object):
storageLevel.deserialized,
storageLevel.replication)
- def setJobGroup(self, groupId, description):
+ def setJobGroup(self, groupId, description, interruptOnCancel=False):
"""
Assigns a group ID to all the jobs started by this thread until the group ID is set to a
different value or cleared.
@@ -437,8 +437,41 @@ class SparkContext(object):
Often, a unit of execution in an application consists of multiple Spark actions or jobs.
Application programmers can use this method to group all those jobs together and give a
group description. Once set, the Spark web UI will associate such jobs with this group.
- """
- self._jsc.setJobGroup(groupId, description)
+
+ The application can use L{SparkContext.cancelJobGroup} to cancel all
+ running jobs in this group.
+
+ >>> import thread, threading
+ >>> from time import sleep
+ >>> result = "Not Set"
+ >>> lock = threading.Lock()
+ >>> def map_func(x):
+ ... sleep(100)
+ ... return x * x
+ >>> def start_job(x):
+ ... global result
+ ... try:
+ ... sc.setJobGroup("job_to_cancel", "some description")
+ ... result = sc.parallelize(range(x)).map(map_func).collect()
+ ... except Exception as e:
+ ... result = "Cancelled"
+ ... lock.release()
+ >>> def stop_job():
+ ... sleep(5)
+ ... sc.cancelJobGroup("job_to_cancel")
+ >>> supress = lock.acquire()
+ >>> supress = thread.start_new_thread(start_job, (10,))
+ >>> supress = thread.start_new_thread(stop_job, tuple())
+ >>> supress = lock.acquire()
+ >>> print result
+ Cancelled
+
+ If interruptOnCancel is set to true for the job group, then job cancellation will result
+ in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+ that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+ where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+ """
+ self._jsc.setJobGroup(groupId, description, interruptOnCancel)
def setLocalProperty(self, key, value):
"""
@@ -460,6 +493,19 @@ class SparkContext(object):
"""
return self._jsc.sc().sparkUser()
+ def cancelJobGroup(self, groupId):
+ """
+ Cancel active jobs for the specified group. See L{SparkContext.setJobGroup}
+ for more information.
+ """
+ self._jsc.sc().cancelJobGroup(groupId)
+
+ def cancelAllJobs(self):
+ """
+ Cancel all jobs that have been scheduled or are running.
+ """
+ self._jsc.sc().cancelAllJobs()
+
def _test():
import atexit
import doctest