From 67659c9afaeb2289e56fd87fafee953e8f050383 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 3 Nov 2016 14:43:25 -0700 Subject: [SPARK-18212][SS][KAFKA] increase executor poll timeout ## What changes were proposed in this pull request? Increase poll timeout to try and address flaky test ## How was this patch tested? Ran existing unit tests Author: cody koeninger Closes #15737 from koeninger/SPARK-18212. --- .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 5 ++++- .../main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'external') diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 61cba737d1..b21508cd7e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource( private val sc = sqlContext.sparkContext - private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + private val pollTimeoutMs = sourceOptions.getOrElse( + "kafkaConsumer.pollTimeoutMs", + sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong private val maxOffsetFetchAttempts = sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 5b5a9ac48c..98394251bb 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V]( " must be set to false for executor kafka params, else offsets may commit before processing") // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? - private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512) + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", + conf.getTimeAsMs("spark.network.timeout", "120s")) private val cacheInitialCapacity = conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) private val cacheMaxCapacity = -- cgit v1.2.3