aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 5b5a9ac48c..98394251bb 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
- private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
+ private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
+ conf.getTimeAsMs("spark.network.timeout", "120s"))
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =