aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVarun Saxena <vsaxena.varun@gmail.com>2015-01-05 10:32:37 -0800
committerReynold Xin <rxin@databricks.com>2015-01-05 10:32:37 -0800
commitd3f07fd23cc26a70f44c52e24445974d4885d58a (patch)
treec98d0adeda2bc972b4e332f8776b985d6c0255ce
parent5c506cecb933b156b2f06a688ee08c4347bf0d47 (diff)
downloadspark-d3f07fd23cc26a70f44c52e24445974d4885d58a.tar.gz
spark-d3f07fd23cc26a70f44c52e24445974d4885d58a.tar.bz2
spark-d3f07fd23cc26a70f44c52e24445974d4885d58a.zip
[SPARK-4688] Have a single shared network timeout in Spark
[SPARK-4688] Have a single shared network timeout in Spark Author: Varun Saxena <vsaxena.varun@gmail.com> Author: varunsaxena <vsaxena.varun@gmail.com> Closes #3562 from varunsaxena/SPARK-4688 and squashes the following commits: 6e97f72 [Varun Saxena] [SPARK-4688] Single shared network timeout cd783a2 [Varun Saxena] SPARK-4688 d6f8c29 [Varun Saxena] SCALA-4688 9562b15 [Varun Saxena] SPARK-4688 a75f014 [varunsaxena] SPARK-4688 594226c [varunsaxena] SPARK-4688
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
-rw-r--r--docs/configuration.md10
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java4
5 files changed, 21 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 243b71c980..98455c0968 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -81,7 +81,8 @@ private[nio] class ConnectionManager(
private val ackTimeoutMonitor =
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
- private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
+ private val ackTimeout =
+ conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 100))
// Get the thread counts from the Spark Configuration.
//
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 9cbda41223..9d77cf2788 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -52,8 +52,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private val akkaTimeout = AkkaUtils.askTimeout(conf)
- val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
- math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000))
+ val slaveTimeout = {
+ val defaultMs = math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)
+ val networkTimeout = conf.getInt("spark.network.timeout", defaultMs / 1000)
+ conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", networkTimeout * 1000)
+ }
val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8c2457f56b..64e3a5416c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -65,7 +65,7 @@ private[spark] object AkkaUtils extends Logging {
val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
+ val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 100))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
diff --git a/docs/configuration.md b/docs/configuration.md
index 9bb6499993..7ada67fc30 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -819,6 +819,16 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.network.timeout</code></td>
+ <td>100</td>
+ <td>
+ Default timeout for all network interactions, in seconds. This config will be used in
+ place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
+ <code>spark.storage.blockManagerSlaveTimeoutMs</code> or <code>spark.shuffle.io.connectionTimeout</code>,
+ if they are not configured.
+ </td>
+</tr>
+<tr>
<td><code>spark.akka.heartbeat.pauses</code></td>
<td>6000</td>
<td>
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 7c9adf52af..e34382da22 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -37,7 +37,9 @@ public class TransportConf {
/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
- return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000;
+ int timeout =
+ conf.getInt("spark.shuffle.io.connectionTimeout", conf.getInt("spark.network.timeout", 100));
+ return timeout * 1000;
}
/** Number of concurrent connections between two nodes for fetching data. */