aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorAhir Reddy <ahirreddy@gmail.com>2014-04-24 20:21:10 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-24 20:21:10 -0700
commite53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7 (patch)
treeb156c7c7cc8e5b3d3876e0c8268f4d979541cacc /python/pyspark/context.py
parentee6f7e22a449837864072e3cd2b6696005f134f1 (diff)
downloadspark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.gz
spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.bz2
spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.zip
[SPARK-986]: Job cancelation for PySpark
* Additions to the PySpark API to cancel jobs * Monitor Thread in PythonRDD to kill Python workers if a task is interrupted Author: Ahir Reddy <ahirreddy@gmail.com> Closes #541 from ahirreddy/python-cancel and squashes the following commits: dfdf447 [Ahir Reddy] Changed success -> completed and made logging message clearer 6c860ab [Ahir Reddy] PR Comments 4b4100a [Ahir Reddy] Success flag adba6ed [Ahir Reddy] Destroy python workers 27a2f8f [Ahir Reddy] Start the writer thread... d422f7b [Ahir Reddy] Remove unnecesssary vals adda337 [Ahir Reddy] Busy wait on the ocntext.interrupted flag, and then kill the python worker d9e472f [Ahir Reddy] Revert "removed unnecessary vals" 5b9cae5 [Ahir Reddy] removed unnecessary vals 07b54d9 [Ahir Reddy] Fix canceling unit test 8ae9681 [Ahir Reddy] Don't interrupt worker 7722342 [Ahir Reddy] Monitor Thread for python workers db04e16 [Ahir Reddy] Added canceling api to PySpark
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py52
1 files changed, 49 insertions, 3 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index f63cc4a55f..c74dc5fd4f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -429,7 +429,7 @@ class SparkContext(object):
storageLevel.deserialized,
storageLevel.replication)
- def setJobGroup(self, groupId, description):
+ def setJobGroup(self, groupId, description, interruptOnCancel=False):
"""
Assigns a group ID to all the jobs started by this thread until the group ID is set to a
different value or cleared.
@@ -437,8 +437,41 @@ class SparkContext(object):
Often, a unit of execution in an application consists of multiple Spark actions or jobs.
Application programmers can use this method to group all those jobs together and give a
group description. Once set, the Spark web UI will associate such jobs with this group.
- """
- self._jsc.setJobGroup(groupId, description)
+
+ The application can use L{SparkContext.cancelJobGroup} to cancel all
+ running jobs in this group.
+
+ >>> import thread, threading
+ >>> from time import sleep
+ >>> result = "Not Set"
+ >>> lock = threading.Lock()
+ >>> def map_func(x):
+ ... sleep(100)
+ ... return x * x
+ >>> def start_job(x):
+ ... global result
+ ... try:
+ ... sc.setJobGroup("job_to_cancel", "some description")
+ ... result = sc.parallelize(range(x)).map(map_func).collect()
+ ... except Exception as e:
+ ... result = "Cancelled"
+ ... lock.release()
+ >>> def stop_job():
+ ... sleep(5)
+ ... sc.cancelJobGroup("job_to_cancel")
+ >>> supress = lock.acquire()
+ >>> supress = thread.start_new_thread(start_job, (10,))
+ >>> supress = thread.start_new_thread(stop_job, tuple())
+ >>> supress = lock.acquire()
+ >>> print result
+ Cancelled
+
+ If interruptOnCancel is set to true for the job group, then job cancellation will result
+ in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+ that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+ where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+ """
+ self._jsc.setJobGroup(groupId, description, interruptOnCancel)
def setLocalProperty(self, key, value):
"""
@@ -460,6 +493,19 @@ class SparkContext(object):
"""
return self._jsc.sc().sparkUser()
+ def cancelJobGroup(self, groupId):
+ """
+ Cancel active jobs for the specified group. See L{SparkContext.setJobGroup}
+ for more information.
+ """
+ self._jsc.sc().cancelJobGroup(groupId)
+
+ def cancelAllJobs(self):
+ """
+ Cancel all jobs that have been scheduled or are running.
+ """
+ self._jsc.sc().cancelAllJobs()
+
def _test():
import atexit
import doctest