aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala44
1 files changed, 37 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index ce4225cae6..cef203006d 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -20,8 +20,25 @@ package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
+/**
+ * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
+ * Driver, or a standalone shuffle service) into a TransportConf with details on our environment
+ * like the number of cores that are allocated to this JVM.
+ */
object SparkTransportConf {
/**
+ * Specifies an upper bound on the number of Netty threads that Spark requires by default.
+ * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
+ * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
+ * at a premium.
+ *
+ * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
+ * allocation. It can be overridden by setting the number of serverThreads and clientThreads
+ * manually in Spark's configuration.
+ */
+ private val MAX_DEFAULT_NETTY_THREADS = 8
+
+ /**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
@@ -29,15 +46,28 @@ object SparkTransportConf {
*/
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
- if (numUsableCores > 0) {
- // Only set if serverThreads/clientThreads not already set.
- conf.set("spark.shuffle.io.serverThreads",
- conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
- conf.set("spark.shuffle.io.clientThreads",
- conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
- }
+
+ // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
+ // assuming we have all the machine's cores).
+ // NB: Only set if serverThreads/clientThreads not already set.
+ val numThreads = defaultNumThreads(numUsableCores)
+ conf.set("spark.shuffle.io.serverThreads",
+ conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
+ conf.set("spark.shuffle.io.clientThreads",
+ conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
+
new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
}
+
+ /**
+ * Returns the default number of threads for both the Netty client and server thread pools.
+ * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
+ */
+ private def defaultNumThreads(numUsableCores: Int): Int = {
+ val availableCores =
+ if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
+ math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
+ }
}