aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala16
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala16
2 files changed, 32 insertions, 0 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7ee4b5c842..5f47c79cab 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.sasl.ShuffleSecretManager
@deprecated("use yarn/stable", "1.2.0")
class ExecutorRunnable(
@@ -90,6 +91,21 @@ class ExecutorRunnable(
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+ // If external shuffle service is enabled, register with the Yarn shuffle service already
+ // started on the NodeManager and, if authentication is enabled, provide it with our secret
+ // key for fetching shuffle files later
+ if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+ val secretString = securityMgr.getSecretKey()
+ val secretBytes =
+ if (secretString != null) {
+ ShuffleSecretManager.stringToBytes(secretString)
+ } else {
+ // Authentication is not enabled, so just provide dummy metadata
+ ByteBuffer.allocate(0)
+ }
+ ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+ }
+
// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 0b5a92d87d..18f48b4b6c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.sasl.ShuffleSecretManager
class ExecutorRunnable(
@@ -89,6 +90,21 @@ class ExecutorRunnable(
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+ // If external shuffle service is enabled, register with the Yarn shuffle service already
+ // started on the NodeManager and, if authentication is enabled, provide it with our secret
+ // key for fetching shuffle files later
+ if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+ val secretString = securityMgr.getSecretKey()
+ val secretBytes =
+ if (secretString != null) {
+ ShuffleSecretManager.stringToBytes(secretString)
+ } else {
+ // Authentication is not enabled, so just provide dummy metadata
+ ByteBuffer.allocate(0)
+ }
+ ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+ }
+
// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx)
}