diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-04-22 15:42:47 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-04-22 15:42:47 -0700 |
commit | fde1340c768e18e9628e5f0eeb2f283c74c294fa (patch) | |
tree | 84968280a8426d42893b03ebfa9e4dd928c08b00 | |
parent | 3647120a5a879edf3a96a5fd68fb7aa849ad57ef (diff) | |
download | spark-fde1340c768e18e9628e5f0eeb2f283c74c294fa.tar.gz spark-fde1340c768e18e9628e5f0eeb2f283c74c294fa.tar.bz2 spark-fde1340c768e18e9628e5f0eeb2f283c74c294fa.zip |
[SPARK-14701][STREAMING] First stop the event loop, then stop the checkpoint writer in JobGenerator
Currently if we call `streamingContext.stop` (e.g. in a `StreamingListener.onBatchCompleted` callback) when a batch is about to complete, a `rejectedException` may get thrown from `checkPointWriter.executor`, since the `eventLoop` will try to process `DoCheckpoint` events even after the `checkPointWriter.executor` was stopped.
Please see [SPARK-14701](https://issues.apache.org/jira/browse/SPARK-14701) for details and stack traces.
## What changes were proposed in this pull request?
Reversed the stopping order of `event loop` and `checkpoint writer`.
## How was this patch tested?
Existing test suits.
(no dedicated test suits were added because the change is simple to reason about)
Author: Liwei Lin <lwlin7@gmail.com>
Closes #12489 from lw-lin/spark-14701.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 307ff1f7ec..8f9421fc09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -154,9 +154,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { graph.stop() } - // Stop the event loop and checkpoint writer - if (shouldCheckpoint) checkpointWriter.stop() + // First stop the event loop, then stop the checkpoint writer; see SPARK-14701 eventLoop.stop() + if (shouldCheckpoint) checkpointWriter.stop() logInfo("Stopped JobGenerator") } |