aboutsummaryrefslogtreecommitdiff
path: root/mesos
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-11-28 21:10:57 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-28 21:10:57 -0800
commit8b325b17ecdf013b7a6edcb7ee3773546bd914df (patch)
treee2826f751402537582646f88fe3b905783fa2f7e /mesos
parent1633ff3b6c97e33191859f34c868782cbb0972fd (diff)
downloadspark-8b325b17ecdf013b7a6edcb7ee3773546bd914df.tar.gz
spark-8b325b17ecdf013b7a6edcb7ee3773546bd914df.tar.bz2
spark-8b325b17ecdf013b7a6edcb7ee3773546bd914df.zip
[SPARK-18547][CORE] Propagate I/O encryption key when executors register.
This change modifies the method used to propagate encryption keys used during shuffle. Instead of relying on YARN's UserGroupInformation credential propagation, this change explicitly distributes the key using the messages exchanged between driver and executor during registration. When RPC encryption is enabled, this means key propagation is also secure. This allows shuffle encryption to work in non-YARN mode, which means that it's easier to write unit tests for areas of the code that are affected by the feature. The key is stored in the SecurityManager; because there are many instances of that class used in the code, the key is only guaranteed to exist in the instance managed by the SparkEnv. This path was chosen to avoid storing the key in the SparkConf, which would risk having the key being written to disk as part of the configuration (as, for example, is done when starting YARN applications). Tested by new and existing unit tests (which were moved from the YARN module to core), and by running apps with shuffle encryption enabled. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15981 from vanzin/SPARK-18547.
Diffstat (limited to 'mesos')
-rw-r--r--mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala2
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala4
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala11
3 files changed, 15 insertions, 2 deletions
diff --git a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 1937bd30ba..ee9149ce02 100644
--- a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -75,7 +75,7 @@ private[spark] class MesosExecutorBackend
val conf = new SparkConf(loadDefaults = true).setAll(properties)
val port = conf.getInt("spark.executor.port", 0)
val env = SparkEnv.createExecutorEnv(
- conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false)
+ conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
executor = new Executor(
executorId,
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
index a849c4afa2..ed29b346ba 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos
import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
/**
@@ -37,6 +38,9 @@ private[spark] class MesosClusterManager extends ExternalClusterManager {
override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
+ require(!sc.conf.get(IO_ENCRYPTION_ENABLED),
+ "I/O encryption is currently not supported in Mesos.")
+
val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
if (coarse) {
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
index 6fce06632c..a55855428b 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.scheduler.cluster.mesos
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark._
+import org.apache.spark.internal.config._
class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) {
@@ -44,4 +45,12 @@ class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
classOf[MesosFineGrainedSchedulerBackend],
coarse = false)
}
+
+ test("mesos with i/o encryption throws error") {
+ val se = intercept[SparkException] {
+ val conf = new SparkConf().setAppName("test").set(IO_ENCRYPTION_ENABLED, true)
+ sc = new SparkContext("mesos", "test", conf)
+ }
+ assert(se.getCause().isInstanceOf[IllegalArgumentException])
+ }
}