diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2016-11-28 21:10:57 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-28 21:10:57 -0800 |
commit | 8b325b17ecdf013b7a6edcb7ee3773546bd914df (patch) | |
tree | e2826f751402537582646f88fe3b905783fa2f7e /mesos | |
parent | 1633ff3b6c97e33191859f34c868782cbb0972fd (diff) | |
download | spark-8b325b17ecdf013b7a6edcb7ee3773546bd914df.tar.gz spark-8b325b17ecdf013b7a6edcb7ee3773546bd914df.tar.bz2 spark-8b325b17ecdf013b7a6edcb7ee3773546bd914df.zip |
[SPARK-18547][CORE] Propagate I/O encryption key when executors register.
This change modifies the method used to propagate encryption keys used during
shuffle. Instead of relying on YARN's UserGroupInformation credential propagation,
this change explicitly distributes the key using the messages exchanged between
driver and executor during registration. When RPC encryption is enabled, this means
key propagation is also secure.
This allows shuffle encryption to work in non-YARN mode, which means that it's
easier to write unit tests for areas of the code that are affected by the feature.
The key is stored in the SecurityManager; because there are many instances of
that class used in the code, the key is only guaranteed to exist in the instance
managed by the SparkEnv. This path was chosen to avoid storing the key in the
SparkConf, which would risk having the key being written to disk as part of the
configuration (as, for example, is done when starting YARN applications).
Tested by new and existing unit tests (which were moved from the YARN module to
core), and by running apps with shuffle encryption enabled.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #15981 from vanzin/SPARK-18547.
Diffstat (limited to 'mesos')
3 files changed, 15 insertions, 2 deletions
diff --git a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 1937bd30ba..ee9149ce02 100644 --- a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -75,7 +75,7 @@ private[spark] class MesosExecutorBackend val conf = new SparkConf(loadDefaults = true).setAll(properties) val port = conf.getInt("spark.executor.port", 0) val env = SparkEnv.createExecutorEnv( - conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) + conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false) executor = new Executor( executorId, diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala index a849c4afa2..ed29b346ba 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.config._ import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} /** @@ -37,6 +38,9 @@ private[spark] class MesosClusterManager extends ExternalClusterManager { override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { + require(!sc.conf.get(IO_ENCRYPTION_ENABLED), + "I/O encryption is currently not supported in Mesos.") + val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1) val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) if (coarse) { diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala index 6fce06632c..a55855428b 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ +import org.apache.spark.internal.config._ class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext { def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) { @@ -44,4 +45,12 @@ class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext { classOf[MesosFineGrainedSchedulerBackend], coarse = false) } + + test("mesos with i/o encryption throws error") { + val se = intercept[SparkException] { + val conf = new SparkConf().setAppName("test").set(IO_ENCRYPTION_ENABLED, true) + sc = new SparkContext("mesos", "test", conf) + } + assert(se.getCause().isInstanceOf[IllegalArgumentException]) + } } |