diff options
author | Ahir Reddy <ahirreddy@gmail.com> | 2014-04-24 20:21:10 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-04-24 20:21:10 -0700 |
commit | e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7 (patch) | |
tree | b156c7c7cc8e5b3d3876e0c8268f4d979541cacc | |
parent | ee6f7e22a449837864072e3cd2b6696005f134f1 (diff) | |
download | spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.gz spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.bz2 spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.zip |
[SPARK-986]: Job cancelation for PySpark
* Additions to the PySpark API to cancel jobs
* Monitor Thread in PythonRDD to kill Python workers if a task is interrupted
Author: Ahir Reddy <ahirreddy@gmail.com>
Closes #541 from ahirreddy/python-cancel and squashes the following commits:
dfdf447 [Ahir Reddy] Changed success -> completed and made logging message clearer
6c860ab [Ahir Reddy] PR Comments
4b4100a [Ahir Reddy] Success flag
adba6ed [Ahir Reddy] Destroy python workers
27a2f8f [Ahir Reddy] Start the writer thread...
d422f7b [Ahir Reddy] Remove unnecesssary vals
adda337 [Ahir Reddy] Busy wait on the ocntext.interrupted flag, and then kill the python worker
d9e472f [Ahir Reddy] Revert "removed unnecessary vals"
5b9cae5 [Ahir Reddy] removed unnecessary vals
07b54d9 [Ahir Reddy] Fix canceling unit test
8ae9681 [Ahir Reddy] Don't interrupt worker
7722342 [Ahir Reddy] Monitor Thread for python workers
db04e16 [Ahir Reddy] Added canceling api to PySpark
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkEnv.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 30 | ||||
-rw-r--r-- | python/pyspark/context.py | 52 |
3 files changed, 86 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 915315ed74..bea435ec34 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -97,6 +97,14 @@ class SparkEnv ( pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() } } + + private[spark] + def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers(key).stop() + } + } } object SparkEnv extends Logging { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0d71fdbb03..1498b017a7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -110,13 +110,41 @@ private[spark] class PythonRDD[T: ClassTag]( } }.start() + // Necessary to distinguish between a task that has failed and a task that is finished + @volatile var complete: Boolean = false + + // It is necessary to have a monitor thread for python workers if the user cancels with + // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the + // threads can block indefinitely. + new Thread(s"Worker Monitor for $pythonExec") { + override def run() { + // Kill the worker if it is interrupted or completed + // When a python task completes, the context is always set to interupted + while (!context.interrupted) { + Thread.sleep(2000) + } + if (!complete) { + try { + logWarning("Incomplete task interrupted: Attempting to kill Python Worker") + env.destroyPythonWorker(pythonExec, envVars.toMap) + } catch { + case e: Exception => + logError("Exception when trying to kill worker", e) + } + } + } + }.start() + /* * Partial fix for SPARK-1019: Attempts to stop reading the input stream since * other completion callbacks might invalidate the input. Because interruption * is not synchronous this still leaves a potential race where the interruption is * processed only after the stream becomes invalid. */ - context.addOnCompleteCallback(() => context.interrupted = true) + context.addOnCompleteCallback{ () => + complete = true // Indicate that the task has completed successfully + context.interrupted = true + } // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f63cc4a55f..c74dc5fd4f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -429,7 +429,7 @@ class SparkContext(object): storageLevel.deserialized, storageLevel.replication) - def setJobGroup(self, groupId, description): + def setJobGroup(self, groupId, description, interruptOnCancel=False): """ Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared. @@ -437,8 +437,41 @@ class SparkContext(object): Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. - """ - self._jsc.setJobGroup(groupId, description) + + The application can use L{SparkContext.cancelJobGroup} to cancel all + running jobs in this group. + + >>> import thread, threading + >>> from time import sleep + >>> result = "Not Set" + >>> lock = threading.Lock() + >>> def map_func(x): + ... sleep(100) + ... return x * x + >>> def start_job(x): + ... global result + ... try: + ... sc.setJobGroup("job_to_cancel", "some description") + ... result = sc.parallelize(range(x)).map(map_func).collect() + ... except Exception as e: + ... result = "Cancelled" + ... lock.release() + >>> def stop_job(): + ... sleep(5) + ... sc.cancelJobGroup("job_to_cancel") + >>> supress = lock.acquire() + >>> supress = thread.start_new_thread(start_job, (10,)) + >>> supress = thread.start_new_thread(stop_job, tuple()) + >>> supress = lock.acquire() + >>> print result + Cancelled + + If interruptOnCancel is set to true for the job group, then job cancellation will result + in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure + that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, + where HDFS may respond to Thread.interrupt() by marking nodes as dead. + """ + self._jsc.setJobGroup(groupId, description, interruptOnCancel) def setLocalProperty(self, key, value): """ @@ -460,6 +493,19 @@ class SparkContext(object): """ return self._jsc.sc().sparkUser() + def cancelJobGroup(self, groupId): + """ + Cancel active jobs for the specified group. See L{SparkContext.setJobGroup} + for more information. + """ + self._jsc.sc().cancelJobGroup(groupId) + + def cancelAllJobs(self): + """ + Cancel all jobs that have been scheduled or are running. + """ + self._jsc.sc().cancelAllJobs() + def _test(): import atexit import doctest |