aboutsummaryrefslogtreecommitdiff
path: root/dev
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-21 15:39:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-21 15:39:36 -0800
commit95efc895e929701a605313b87ad0cd91edee2f81 (patch)
treed02de15c4bb7766e4975bbcf85140bfb76267ce0 /dev
parent354e936187708a404c0349e3d8815a47953123ec (diff)
downloadspark-95efc895e929701a605313b87ad0cd91edee2f81.tar.gz
spark-95efc895e929701a605313b87ad0cd91edee2f81.tar.bz2
spark-95efc895e929701a605313b87ad0cd91edee2f81.zip
[SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix the flaky test
## What changes were proposed in this pull request? When KafkaSource fails on Kafka errors, we should create a new consumer to retry rather than using the existing broken one because it's possible that the broken one will fail again. This PR also assigns a new group id to the new created consumer for a possible race condition: the broken consumer cannot talk with the Kafka cluster in `close` but the new consumer can talk to Kafka cluster. I'm not sure if this will happen or not. Just for safety to avoid that the Kafka cluster thinks there are two consumers with the same group id in a short time window. (Note: CachedKafkaConsumer doesn't need this fix since `assign` never uses the group id.) ## How was this patch tested? In https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console , it ran this flaky test 120 times and all passed. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16282 from zsxwing/kafka-fix.
Diffstat (limited to 'dev')
-rw-r--r--dev/sparktestsupport/modules.py3
1 files changed, 2 insertions, 1 deletions
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 1a7cf9a2c9..10ad1fe3aa 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -245,7 +245,8 @@ streaming_kafka_0_10 = Module(
name="streaming-kafka-0-10",
dependencies=[streaming],
source_file_regexes=[
- "external/kafka-0-10",
+ # The ending "/" is necessary otherwise it will include "sql-kafka" codes
+ "external/kafka-0-10/",
"external/kafka-0-10-assembly",
],
sbt_test_goals=[