aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala2
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java65
-rw-r--r--network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/StreamSuite.java2
-rw-r--r--network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java6
-rw-r--r--network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java6
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java2
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java2
23 files changed, 84 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index a039d543c3..e8a1e35c3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -45,7 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
+ private val transportConf =
+ SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val blockHandler = newShuffleBlockHandler(transportConf)
private val transportContext: TransportContext =
new TransportContext(transportConf, blockHandler, true)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 70a42f9045..b0694e3c6c 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -41,7 +41,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
+ private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index cef203006d..84833f59d7 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -40,23 +40,23 @@ object SparkTransportConf {
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
+ * @param _conf the [[SparkConf]]
+ * @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
*/
- def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
+ def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
// assuming we have all the machine's cores).
// NB: Only set if serverThreads/clientThreads not already set.
val numThreads = defaultNumThreads(numUsableCores)
- conf.set("spark.shuffle.io.serverThreads",
- conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
- conf.set("spark.shuffle.io.clientThreads",
- conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
+ conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
+ conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
- new TransportConf(new ConfigProvider {
+ new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 09093819bb..3e0c497969 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -22,16 +22,13 @@ import java.net.{InetSocketAddress, URI}
import java.nio.ByteBuffer
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy
+import javax.annotation.Nullable
-import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag
import scala.util.{DynamicVariable, Failure, Success}
import scala.util.control.NonFatal
-import com.google.common.base.Preconditions
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.network.TransportContext
import org.apache.spark.network.client._
@@ -49,7 +46,8 @@ private[netty] class NettyRpcEnv(
securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
private val transportConf = SparkTransportConf.fromSparkConf(
- conf.clone.set("spark.shuffle.io.numConnectionsPerPeer", "1"),
+ conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
+ "rpc",
conf.getInt("spark.rpc.io.threads", 0))
private val dispatcher: Dispatcher = new Dispatcher(this)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 2de9b6a651..7d08eae0b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -109,7 +109,7 @@ private[spark] class CoarseMesosSchedulerBackend(
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
Some(new MesosExternalShuffleClient(
- SparkTransportConf.fromSparkConf(conf),
+ SparkTransportConf.fromSparkConf(conf, "shuffle"),
securityManager,
securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled()))
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 39fadd8783..cc5f933393 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -46,7 +46,7 @@ private[spark] trait ShuffleWriterGroup {
private[spark] class FileShuffleBlockResolver(conf: SparkConf)
extends ShuffleBlockResolver with Logging {
- private val transportConf = SparkTransportConf.fromSparkConf(conf)
+ private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
private lazy val blockManager = SparkEnv.get.blockManager
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 05b1eed7f3..fadb8fe7ed 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -47,7 +47,7 @@ private[spark] class IndexShuffleBlockResolver(
private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)
- private val transportConf = SparkTransportConf.fromSparkConf(conf)
+ private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 661c706af3..ab0007fb78 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -122,7 +122,7 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
- val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
+ val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled())
} else {
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 231f4631e0..1c775bcb3d 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -35,7 +35,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _
override def beforeAll() {
- val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
+ val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 3b2eff3779..115135d44a 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -23,18 +23,53 @@ import com.google.common.primitives.Ints;
* A central location that tracks all the settings we expose to users.
*/
public class TransportConf {
+
+ private final String SPARK_NETWORK_IO_MODE_KEY;
+ private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
+ private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
+ private final String SPARK_NETWORK_IO_BACKLOG_KEY;
+ private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
+ private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
+ private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;
+ private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;
+ private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;
+ private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;
+ private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;
+ private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
+ private final String SPARK_NETWORK_IO_LAZYFD_KEY;
+
private final ConfigProvider conf;
- public TransportConf(ConfigProvider conf) {
+ private final String module;
+
+ public TransportConf(String module, ConfigProvider conf) {
+ this.module = module;
this.conf = conf;
+ SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
+ SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
+ SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
+ SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
+ SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer");
+ SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
+ SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
+ SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
+ SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
+ SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
+ SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
+ SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
+ SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
+ }
+
+ private String getConfKey(String suffix) {
+ return "spark." + module + "." + suffix;
}
/** IO mode: nio or epoll */
- public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); }
+ public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }
/** If true, we will prefer allocating off-heap byte buffers within Netty. */
public boolean preferDirectBufs() {
- return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
+ return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
}
/** Connect timeout in milliseconds. Default 120 secs. */
@@ -42,23 +77,23 @@ public class TransportConf {
long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
- conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+ conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
}
/** Number of concurrent connections between two nodes for fetching data. */
public int numConnectionsPerPeer() {
- return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
+ return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
}
/** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
- public int backLog() { return conf.getInt("spark.shuffle.io.backLog", -1); }
+ public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
/** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
- public int serverThreads() { return conf.getInt("spark.shuffle.io.serverThreads", 0); }
+ public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
/** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */
- public int clientThreads() { return conf.getInt("spark.shuffle.io.clientThreads", 0); }
+ public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
/**
* Receive buffer size (SO_RCVBUF).
@@ -67,28 +102,28 @@ public class TransportConf {
* Assuming latency = 1ms, network_bandwidth = 10Gbps
* buffer size should be ~ 1.25MB
*/
- public int receiveBuf() { return conf.getInt("spark.shuffle.io.receiveBuffer", -1); }
+ public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
/** Send buffer size (SO_SNDBUF). */
- public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
+ public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
public int saslRTTimeoutMs() {
- return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
+ return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s")) * 1000;
}
/**
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
* If set to 0, we will not do any retries.
*/
- public int maxIORetries() { return conf.getInt("spark.shuffle.io.maxRetries", 3); }
+ public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }
/**
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTimeMs() {
- return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
+ return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000;
}
/**
@@ -101,11 +136,11 @@ public class TransportConf {
}
/**
- * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
+ * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are
* created only when data is going to be transferred. This can reduce the number of open files.
*/
public boolean lazyFileDescriptor() {
- return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+ return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);
}
/**
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index dfb7740344..dc5fa1cee6 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -83,7 +83,7 @@ public class ChunkFetchIntegrationSuite {
fp.write(fileContent);
fp.close();
- final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
streamManager = new StreamManager() {
diff --git a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index 84ebb337e6..42955ef692 100644
--- a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -60,7 +60,7 @@ public class RequestTimeoutIntegrationSuite {
public void setUp() throws Exception {
Map<String, String> configMap = Maps.newHashMap();
configMap.put("spark.shuffle.io.connectionTimeout", "2s");
- conf = new TransportConf(new MapConfigProvider(configMap));
+ conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
defaultManager = new StreamManager() {
@Override
diff --git a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 64b457b4b3..8eb56bdd98 100644
--- a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -49,7 +49,7 @@ public class RpcIntegrationSuite {
@BeforeClass
public static void setUp() throws Exception {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
rpcHandler = new RpcHandler() {
@Override
public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
diff --git a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
index 6dcec831de..00158fd081 100644
--- a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
@@ -89,7 +89,7 @@ public class StreamSuite {
fp.close();
}
- final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
final StreamManager streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index f447137419..dac7d4a5b0 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -52,7 +52,7 @@ public class TransportClientFactorySuite {
@Before
public void setUp() {
- conf = new TransportConf(new SystemPropertyConfigProvider());
+ conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
RpcHandler rpcHandler = new NoOpRpcHandler();
context = new TransportContext(conf, rpcHandler);
server1 = context.createServer();
@@ -76,7 +76,7 @@ public class TransportClientFactorySuite {
Map<String, String> configMap = Maps.newHashMap();
configMap.put("spark.shuffle.io.numConnectionsPerPeer", Integer.toString(maxConnections));
- TransportConf conf = new TransportConf(new MapConfigProvider(configMap));
+ TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
RpcHandler rpcHandler = new NoOpRpcHandler();
TransportContext context = new TransportContext(conf, rpcHandler);
@@ -182,7 +182,7 @@ public class TransportClientFactorySuite {
@Test
public void closeIdleConnectionForRequestTimeOut() throws IOException, InterruptedException {
- TransportConf conf = new TransportConf(new ConfigProvider() {
+ TransportConf conf = new TransportConf("shuffle", new ConfigProvider() {
@Override
public String get(String name) {
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 3469e84e7f..b146899670 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -207,7 +207,7 @@ public class SparkSaslSuite {
public void testEncryptedMessageChunking() throws Exception {
File file = File.createTempFile("sasltest", ".txt");
try {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
byte[] data = new byte[8 * 1024];
new Random().nextBytes(data);
@@ -242,7 +242,7 @@ public class SparkSaslSuite {
final File file = File.createTempFile("sasltest", ".txt");
SaslTestCtx ctx = null;
try {
- final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
StreamManager sm = mock(StreamManager.class);
when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>() {
@Override
@@ -368,7 +368,7 @@ public class SparkSaslSuite {
boolean disableClientEncryption)
throws Exception {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
SecretKeyHolder keyHolder = mock(SecretKeyHolder.class);
when(keyHolder.getSaslUser(anyString())).thenReturn("user");
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index c393a5e1e6..1c2fa4d0d4 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -70,7 +70,7 @@ public class SaslIntegrationSuite {
@BeforeClass
public static void beforeAll() throws IOException {
- conf = new TransportConf(new SystemPropertyConfigProvider());
+ conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
context = new TransportContext(conf, new TestRpcHandler());
secretKeyHolder = mock(SecretKeyHolder.class);
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index 3c6cb367de..a9958232a1 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleBlockResolverSuite {
static TestShuffleDataContext dataContext;
- static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
@BeforeClass
public static void beforeAll() throws IOException {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 2f4f1d0df4..532d7ab8d0 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -35,7 +35,7 @@ public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
@Test
public void noCleanupAndCleanup() throws IOException {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index a3f9a38b1a..2095f41d79 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -91,7 +91,7 @@ public class ExternalShuffleIntegrationSuite {
dataContext1.create();
dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
- conf = new TransportConf(new SystemPropertyConfigProvider());
+ conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
handler = new ExternalShuffleBlockHandler(conf, null);
TransportContext transportContext = new TransportContext(conf, handler);
server = transportContext.createServer();
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index aa99efda94..08ddb3755b 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -39,7 +39,7 @@ import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleSecuritySuite {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
TransportServer server;
@Before
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 06e46f9241..3a6ef0d3f8 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -254,7 +254,7 @@ public class RetryingBlockFetcherSuite {
BlockFetchingListener listener)
throws IOException {
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
Stubber stub = null;
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 11ea7f3fd3..ba6d30a74c 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -120,7 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
registeredExecutorFile =
findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
- TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
+ TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);