diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-02-17 19:04:45 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-02-17 19:04:45 -0800 |
commit | 15b144d2bf4555981a51276277c08a9c11a402f6 (patch) | |
tree | bf4ea06ed8854d5b9c871ee1411670bd0c6b281d /external/kafka-0-10-sql | |
parent | 988f6d7ee8017756645f2af9993ee020332442cb (diff) | |
download | spark-15b144d2bf4555981a51276277c08a9c11a402f6.tar.gz spark-15b144d2bf4555981a51276277c08a9c11a402f6.tar.bz2 spark-15b144d2bf4555981a51276277c08a9c11a402f6.zip |
[SPARK-19617][SS] Fix the race condition when starting and stopping a query quickly
## What changes were proposed in this pull request?
The streaming thread in StreamExecution uses the following ways to check if it should exit:
- Catch an InterruptException.
- `StreamExecution.state` is TERMINATED.
When starting and stopping a query quickly, the above two checks may both fail:
- Hit [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) and swallow InterruptException
- StreamExecution.stop is called before `state` becomes `ACTIVE`. Then [runBatches](https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252) changes the state from `TERMINATED` to `ACTIVE`.
If the above cases both happen, the query will hang forever.
This PR changes `state` to `AtomicReference` and uses`compareAndSet` to make sure we only change the state from `INITIALIZING` to `ACTIVE`. It also removes the `runUninterruptibly` hack from ``HDFSMetadata`, because HADOOP-14084 won't cause any problem after we fix the race condition.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16947 from zsxwing/SPARK-19617.
Diffstat (limited to 'external/kafka-0-10-sql')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 10b35c74f4..efec51d097 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext { } - testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") { + test("OffsetSeqLog serialization - deserialization") { withTempDir { temp => // use non-existent directory to test whether log make the dir val dir = new File(temp, "dir") |