aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-17 19:04:45 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-17 19:04:45 -0800
commit15b144d2bf4555981a51276277c08a9c11a402f6 (patch)
treebf4ea06ed8854d5b9c871ee1411670bd0c6b281d /external/kafka-0-10-sql
parent988f6d7ee8017756645f2af9993ee020332442cb (diff)
downloadspark-15b144d2bf4555981a51276277c08a9c11a402f6.tar.gz
spark-15b144d2bf4555981a51276277c08a9c11a402f6.tar.bz2
spark-15b144d2bf4555981a51276277c08a9c11a402f6.zip
[SPARK-19617][SS] Fix the race condition when starting and stopping a query quickly
## What changes were proposed in this pull request? The streaming thread in StreamExecution uses the following ways to check if it should exit: - Catch an InterruptException. - `StreamExecution.state` is TERMINATED. When starting and stopping a query quickly, the above two checks may both fail: - Hit [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) and swallow InterruptException - StreamExecution.stop is called before `state` becomes `ACTIVE`. Then [runBatches](https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252) changes the state from `TERMINATED` to `ACTIVE`. If the above cases both happen, the query will hang forever. This PR changes `state` to `AtomicReference` and uses`compareAndSet` to make sure we only change the state from `INITIALIZING` to `ACTIVE`. It also removes the `runUninterruptibly` hack from ``HDFSMetadata`, because HADOOP-14084 won't cause any problem after we fix the race condition. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16947 from zsxwing/SPARK-19617.
Diffstat (limited to 'external/kafka-0-10-sql')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 10b35c74f4..efec51d097 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -55,7 +55,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
}
- testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
+ test("OffsetSeqLog serialization - deserialization") {
withTempDir { temp =>
// use non-existent directory to test whether log make the dir
val dir = new File(temp, "dir")