aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 10d64f98ac..8d83dc8a8f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
@@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
+ PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}