aboutsummaryrefslogtreecommitdiff
path: root/network/common
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-18 12:53:22 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-18 12:53:22 -0800
commit7c5b641808740ba5eed05ba8204cdbaf3fc579f5 (patch)
tree0fdcef50336c771543f7e7e367920289ffabf004 /network/common
parenta416e41e285700f861559d710dbf857405bfddf6 (diff)
downloadspark-7c5b641808740ba5eed05ba8204cdbaf3fc579f5.tar.gz
spark-7c5b641808740ba5eed05ba8204cdbaf3fc579f5.tar.bz2
spark-7c5b641808740ba5eed05ba8204cdbaf3fc579f5.zip
[SPARK-10745][CORE] Separate configs between shuffle and RPC
[SPARK-6028](https://issues.apache.org/jira/browse/SPARK-6028) uses network module to implement RPC. However, there are some configurations named with `spark.shuffle` prefix in the network module. This PR refactors them to make sure the user can control them in shuffle and RPC separately. The user can use `spark.rpc.*` to set the configuration for netty RPC. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9481 from zsxwing/SPARK-10745.
Diffstat (limited to 'network/common')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java65
-rw-r--r--network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/StreamSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java6
-rw-r--r--network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java6
7 files changed, 60 insertions, 25 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 3b2eff3779..115135d44a 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -23,18 +23,53 @@ import com.google.common.primitives.Ints;
* A central location that tracks all the settings we expose to users.
*/
public class TransportConf {
+
+ private final String SPARK_NETWORK_IO_MODE_KEY;
+ private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
+ private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
+ private final String SPARK_NETWORK_IO_BACKLOG_KEY;
+ private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
+ private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
+ private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;
+ private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;
+ private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;
+ private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;
+ private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;
+ private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
+ private final String SPARK_NETWORK_IO_LAZYFD_KEY;
+
private final ConfigProvider conf;
- public TransportConf(ConfigProvider conf) {
+ private final String module;
+
+ public TransportConf(String module, ConfigProvider conf) {
+ this.module = module;
this.conf = conf;
+ SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
+ SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
+ SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
+ SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
+ SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer");
+ SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
+ SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
+ SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
+ SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
+ SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
+ SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
+ SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
+ SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
+ }
+
+ private String getConfKey(String suffix) {
+ return "spark." + module + "." + suffix;
}
/** IO mode: nio or epoll */
- public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); }
+ public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }
/** If true, we will prefer allocating off-heap byte buffers within Netty. */
public boolean preferDirectBufs() {
- return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
+ return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
}
/** Connect timeout in milliseconds. Default 120 secs. */
@@ -42,23 +77,23 @@ public class TransportConf {
long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
- conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+ conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
}
/** Number of concurrent connections between two nodes for fetching data. */
public int numConnectionsPerPeer() {
- return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
+ return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
}
/** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
- public int backLog() { return conf.getInt("spark.shuffle.io.backLog", -1); }
+ public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
/** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
- public int serverThreads() { return conf.getInt("spark.shuffle.io.serverThreads", 0); }
+ public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
/** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */
- public int clientThreads() { return conf.getInt("spark.shuffle.io.clientThreads", 0); }
+ public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
/**
* Receive buffer size (SO_RCVBUF).
@@ -67,28 +102,28 @@ public class TransportConf {
* Assuming latency = 1ms, network_bandwidth = 10Gbps
* buffer size should be ~ 1.25MB
*/
- public int receiveBuf() { return conf.getInt("spark.shuffle.io.receiveBuffer", -1); }
+ public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
/** Send buffer size (SO_SNDBUF). */
- public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
+ public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
public int saslRTTimeoutMs() {
- return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
+ return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s")) * 1000;
}
/**
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
* If set to 0, we will not do any retries.
*/
- public int maxIORetries() { return conf.getInt("spark.shuffle.io.maxRetries", 3); }
+ public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }
/**
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
* Only relevant if maxIORetries &gt; 0.
*/
public int ioRetryWaitTimeMs() {
- return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
+ return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000;
}
/**
@@ -101,11 +136,11 @@ public class TransportConf {
}
/**
- * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
+ * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are
* created only when data is going to be transferred. This can reduce the number of open files.
*/
public boolean lazyFileDescriptor() {
- return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+ return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);
}
/**
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index dfb7740344..dc5fa1cee6 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -83,7 +83,7 @@ public class ChunkFetchIntegrationSuite {
fp.write(fileContent);
fp.close();
- final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
streamManager = new StreamManager() {
diff --git a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index 84ebb337e6..42955ef692 100644
--- a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -60,7 +60,7 @@ public class RequestTimeoutIntegrationSuite {
public void setUp() throws Exception {
Map<String, String> configMap = Maps.newHashMap();
configMap.put("spark.shuffle.io.connectionTimeout", "2s");
- conf = new TransportConf(new MapConfigProvider(configMap));
+ conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
defaultManager = new StreamManager() {
@Override
diff --git a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 64b457b4b3..8eb56bdd98 100644
--- a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -49,7 +49,7 @@ public class RpcIntegrationSuite {
@BeforeClass
public static void setUp() throws Exception {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
rpcHandler = new RpcHandler() {
@Override
public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
diff --git a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
index 6dcec831de..00158fd081 100644
--- a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
@@ -89,7 +89,7 @@ public class StreamSuite {
fp.close();
}
- final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
final StreamManager streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index f447137419..dac7d4a5b0 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -52,7 +52,7 @@ public class TransportClientFactorySuite {
@Before
public void setUp() {
- conf = new TransportConf(new SystemPropertyConfigProvider());
+ conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
RpcHandler rpcHandler = new NoOpRpcHandler();
context = new TransportContext(conf, rpcHandler);
server1 = context.createServer();
@@ -76,7 +76,7 @@ public class TransportClientFactorySuite {
Map<String, String> configMap = Maps.newHashMap();
configMap.put("spark.shuffle.io.numConnectionsPerPeer", Integer.toString(maxConnections));
- TransportConf conf = new TransportConf(new MapConfigProvider(configMap));
+ TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
RpcHandler rpcHandler = new NoOpRpcHandler();
TransportContext context = new TransportContext(conf, rpcHandler);
@@ -182,7 +182,7 @@ public class TransportClientFactorySuite {
@Test
public void closeIdleConnectionForRequestTimeOut() throws IOException, InterruptedException {
- TransportConf conf = new TransportConf(new ConfigProvider() {
+ TransportConf conf = new TransportConf("shuffle", new ConfigProvider() {
@Override
public String get(String name) {
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 3469e84e7f..b146899670 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -207,7 +207,7 @@ public class SparkSaslSuite {
public void testEncryptedMessageChunking() throws Exception {
File file = File.createTempFile("sasltest", ".txt");
try {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
byte[] data = new byte[8 * 1024];
new Random().nextBytes(data);
@@ -242,7 +242,7 @@ public class SparkSaslSuite {
final File file = File.createTempFile("sasltest", ".txt");
SaslTestCtx ctx = null;
try {
- final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
StreamManager sm = mock(StreamManager.class);
when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>() {
@Override
@@ -368,7 +368,7 @@ public class SparkSaslSuite {
boolean disableClientEncryption)
throws Exception {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
SecretKeyHolder keyHolder = mock(SecretKeyHolder.class);
when(keyHolder.getSaslUser(anyString())).thenReturn("user");