aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAhir Reddy <ahirreddy@gmail.com>2014-04-24 20:21:10 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-24 20:21:10 -0700
commite53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7 (patch)
treeb156c7c7cc8e5b3d3876e0c8268f4d979541cacc /core
parentee6f7e22a449837864072e3cd2b6696005f134f1 (diff)
downloadspark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.gz
spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.tar.bz2
spark-e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7.zip
[SPARK-986]: Job cancelation for PySpark
* Additions to the PySpark API to cancel jobs * Monitor Thread in PythonRDD to kill Python workers if a task is interrupted Author: Ahir Reddy <ahirreddy@gmail.com> Closes #541 from ahirreddy/python-cancel and squashes the following commits: dfdf447 [Ahir Reddy] Changed success -> completed and made logging message clearer 6c860ab [Ahir Reddy] PR Comments 4b4100a [Ahir Reddy] Success flag adba6ed [Ahir Reddy] Destroy python workers 27a2f8f [Ahir Reddy] Start the writer thread... d422f7b [Ahir Reddy] Remove unnecesssary vals adda337 [Ahir Reddy] Busy wait on the ocntext.interrupted flag, and then kill the python worker d9e472f [Ahir Reddy] Revert "removed unnecessary vals" 5b9cae5 [Ahir Reddy] removed unnecessary vals 07b54d9 [Ahir Reddy] Fix canceling unit test 8ae9681 [Ahir Reddy] Don't interrupt worker 7722342 [Ahir Reddy] Monitor Thread for python workers db04e16 [Ahir Reddy] Added canceling api to PySpark
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala30
2 files changed, 37 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 915315ed74..bea435ec34 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -97,6 +97,14 @@ class SparkEnv (
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}
+
+ private[spark]
+ def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
+ synchronized {
+ val key = (pythonExec, envVars)
+ pythonWorkers(key).stop()
+ }
+ }
}
object SparkEnv extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d71fdbb03..1498b017a7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -110,13 +110,41 @@ private[spark] class PythonRDD[T: ClassTag](
}
}.start()
+ // Necessary to distinguish between a task that has failed and a task that is finished
+ @volatile var complete: Boolean = false
+
+ // It is necessary to have a monitor thread for python workers if the user cancels with
+ // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
+ // threads can block indefinitely.
+ new Thread(s"Worker Monitor for $pythonExec") {
+ override def run() {
+ // Kill the worker if it is interrupted or completed
+ // When a python task completes, the context is always set to interupted
+ while (!context.interrupted) {
+ Thread.sleep(2000)
+ }
+ if (!complete) {
+ try {
+ logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
+ env.destroyPythonWorker(pythonExec, envVars.toMap)
+ } catch {
+ case e: Exception =>
+ logError("Exception when trying to kill worker", e)
+ }
+ }
+ }
+ }.start()
+
/*
* Partial fix for SPARK-1019: Attempts to stop reading the input stream since
* other completion callbacks might invalidate the input. Because interruption
* is not synchronous this still leaves a potential race where the interruption is
* processed only after the stream becomes invalid.
*/
- context.addOnCompleteCallback(() => context.interrupted = true)
+ context.addOnCompleteCallback{ () =>
+ complete = true // Indicate that the task has completed successfully
+ context.interrupted = true
+ }
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))