aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala5
1 files changed, 4 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 61cba737d1..b21508cd7e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
private val sc = sqlContext.sparkContext
- private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+ private val pollTimeoutMs = sourceOptions.getOrElse(
+ "kafkaConsumer.pollTimeoutMs",
+ sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+ ).toLong
private val maxOffsetFetchAttempts =
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt