diff options
author | Varun Saxena <vsaxena.varun@gmail.com> | 2015-01-05 10:32:37 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-01-05 10:32:37 -0800 |
commit | d3f07fd23cc26a70f44c52e24445974d4885d58a (patch) | |
tree | c98d0adeda2bc972b4e332f8776b985d6c0255ce | |
parent | 5c506cecb933b156b2f06a688ee08c4347bf0d47 (diff) | |
download | spark-d3f07fd23cc26a70f44c52e24445974d4885d58a.tar.gz spark-d3f07fd23cc26a70f44c52e24445974d4885d58a.tar.bz2 spark-d3f07fd23cc26a70f44c52e24445974d4885d58a.zip |
[SPARK-4688] Have a single shared network timeout in Spark
[SPARK-4688] Have a single shared network timeout in Spark
Author: Varun Saxena <vsaxena.varun@gmail.com>
Author: varunsaxena <vsaxena.varun@gmail.com>
Closes #3562 from varunsaxena/SPARK-4688 and squashes the following commits:
6e97f72 [Varun Saxena] [SPARK-4688] Single shared network timeout
cd783a2 [Varun Saxena] SPARK-4688
d6f8c29 [Varun Saxena] SCALA-4688
9562b15 [Varun Saxena] SPARK-4688
a75f014 [varunsaxena] SPARK-4688
594226c [varunsaxena] SPARK-4688
5 files changed, 21 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 243b71c980..98455c0968 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -81,7 +81,8 @@ private[nio] class ConnectionManager( private val ackTimeoutMonitor = new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor")) - private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) + private val ackTimeout = + conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 100)) // Get the thread counts from the Spark Configuration. // diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 9cbda41223..9d77cf2788 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -52,8 +52,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private val akkaTimeout = AkkaUtils.askTimeout(conf) - val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", - math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)) + val slaveTimeout = { + val defaultMs = math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000) + val networkTimeout = conf.getInt("spark.network.timeout", defaultMs / 1000) + conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", networkTimeout * 1000) + } val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 8c2457f56b..64e3a5416c 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -65,7 +65,7 @@ private[spark] object AkkaUtils extends Logging { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.getInt("spark.akka.timeout", 100) + val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 100)) val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" diff --git a/docs/configuration.md b/docs/configuration.md index 9bb6499993..7ada67fc30 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -819,6 +819,16 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td><code>spark.network.timeout</code></td> + <td>100</td> + <td> + Default timeout for all network interactions, in seconds. This config will be used in + place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>, + <code>spark.storage.blockManagerSlaveTimeoutMs</code> or <code>spark.shuffle.io.connectionTimeout</code>, + if they are not configured. + </td> +</tr> +<tr> <td><code>spark.akka.heartbeat.pauses</code></td> <td>6000</td> <td> diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 7c9adf52af..e34382da22 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -37,7 +37,9 @@ public class TransportConf { /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { - return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; + int timeout = + conf.getInt("spark.shuffle.io.connectionTimeout", conf.getInt("spark.network.timeout", 100)); + return timeout * 1000; } /** Number of concurrent connections between two nodes for fetching data. */ |