aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-11-03 14:43:25 -0700
committerMichael Armbrust <michael@databricks.com>2016-11-03 14:43:25 -0700
commit67659c9afaeb2289e56fd87fafee953e8f050383 (patch)
tree6a00e1c2e005a4d1d099e72b1c0fa665f6872328 /external
parent098e4ca9c7af61e64839a50c65be449749af6482 (diff)
downloadspark-67659c9afaeb2289e56fd87fafee953e8f050383.tar.gz
spark-67659c9afaeb2289e56fd87fafee953e8f050383.tar.bz2
spark-67659c9afaeb2289e56fd87fafee953e8f050383.zip
[SPARK-18212][SS][KAFKA] increase executor poll timeout
## What changes were proposed in this pull request? Increase poll timeout to try and address flaky test ## How was this patch tested? Ran existing unit tests Author: cody koeninger <cody@koeninger.org> Closes #15737 from koeninger/SPARK-18212.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala5
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala3
2 files changed, 6 insertions, 2 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 61cba737d1..b21508cd7e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
private val sc = sqlContext.sparkContext
- private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+ private val pollTimeoutMs = sourceOptions.getOrElse(
+ "kafkaConsumer.pollTimeoutMs",
+ sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+ ).toLong
private val maxOffsetFetchAttempts =
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 5b5a9ac48c..98394251bb 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
- private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
+ private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
+ conf.getTimeAsMs("spark.network.timeout", "120s"))
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =