aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-12-09 19:29:09 -0800
committerAaron Davidson <aaron@databricks.com>2014-12-09 19:29:09 -0800
commit9bd9334f588dbb44d01554f9f4ca68a153a48993 (patch)
tree11d69012599e04c5c0bcdb139f3873796d41bfb9 /network
parent2b9b72682e587909a84d3ace214c22cec830eeaf (diff)
downloadspark-9bd9334f588dbb44d01554f9f4ca68a153a48993.tar.gz
spark-9bd9334f588dbb44d01554f9f4ca68a153a48993.tar.bz2
spark-9bd9334f588dbb44d01554f9f4ca68a153a48993.zip
Config updates for the new shuffle transport.
Author: Reynold Xin <rxin@databricks.com> Closes #3657 from rxin/conf-update and squashes the following commits: 7370eab [Reynold Xin] Config updates for the new shuffle transport.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java8
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java2
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java2
3 files changed, 6 insertions, 6 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index f60573998f..13b37f96f8 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -35,14 +35,14 @@ public class TransportConf {
return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
}
- /** Connect timeout in secs. Default 120 secs. */
+ /** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000;
}
/** Number of concurrent connections between two nodes for fetching data. **/
public int numConnectionsPerPeer() {
- return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2);
+ return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
}
/** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
@@ -67,7 +67,7 @@ public class TransportConf {
public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
- public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); }
+ public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; }
/**
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
@@ -79,7 +79,7 @@ public class TransportConf {
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
* Only relevant if maxIORetries &gt; 0.
*/
- public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
+ public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; }
/**
* Minimum size of a block that we should start using memory map rather than reading in through
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java
index 7bc91e3753..33aa134434 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java
@@ -59,7 +59,7 @@ public class SaslClientBootstrap implements TransportClientBootstrap {
ByteBuf buf = Unpooled.buffer(msg.encodedLength());
msg.encode(buf);
- byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout());
+ byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs());
payload = saslClient.response(response);
}
} finally {
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index f8a1a26686..4bb0498e5d 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -106,7 +106,7 @@ public class RetryingBlockFetcher {
this.fetchStarter = fetchStarter;
this.listener = listener;
this.maxRetries = conf.maxIORetries();
- this.retryWaitTime = conf.ioRetryWaitTime();
+ this.retryWaitTime = conf.ioRetryWaitTimeMs();
this.outstandingBlocksIds = Sets.newLinkedHashSet();
Collections.addAll(outstandingBlocksIds, blockIds);
this.currentListener = new RetryingBlockFetchListener();