From f8d93edec82eedab59d50aec06ca2de7e4cf14f6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sun, 1 Nov 2015 15:57:42 -0800 Subject: [SPARK-11073][CORE][YARN] Remove akka dependency in secret key generation. Use standard JDK APIs for that (with a little help from Guava). Most of the changes here are in test code, since there were no tests specific to that part of the code. Author: Marcelo Vanzin Closes #9257 from vanzin/SPARK-11073. --- .../scala/org/apache/spark/SecurityManager.scala | 72 ++++++++++++---------- .../org/apache/spark/SecurityManagerSuite.scala | 23 ++++++- .../spark/deploy/LogUrlsStandaloneSuite.scala | 13 +--- .../spark/deploy/worker/WorkerArgumentsTest.scala | 28 +-------- .../org/apache/spark/storage/LocalDirsSuite.scala | 16 +---- .../org/apache/spark/util/SparkConfWithEnv.scala | 34 ++++++++++ 6 files changed, 105 insertions(+), 81 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 746d2081d4..64e483e384 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -17,11 +17,13 @@ package org.apache.spark +import java.lang.{Byte => JByte} import java.net.{Authenticator, PasswordAuthentication} -import java.security.KeyStore +import java.security.{KeyStore, SecureRandom} import java.security.cert.X509Certificate import javax.net.ssl._ +import com.google.common.hash.HashCodes import com.google.common.io.Files import org.apache.hadoop.io.Text @@ -130,15 +132,16 @@ import org.apache.spark.util.Utils * * The exact mechanisms used to generate/distribute the shared secret are deployment-specific. * - * For Yarn deployments, the secret is automatically generated using the Akka remote - * Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed - * around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels - * of protection. See the Hadoop documentation for more details. Each Spark application on Yarn - * gets a different shared secret. On Yarn, the Spark UI gets configured to use the Hadoop Yarn - * AmIpFilter which requires the user to go through the ResourceManager Proxy. That Proxy is there - * to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use - * filters to do authentication. That authentication then happens via the ResourceManager Proxy - * and Spark will use that to do authorization against the view acls. + * For YARN deployments, the secret is automatically generated. The secret is placed in the Hadoop + * UGI which gets passed around via the Hadoop RPC mechanism. Hadoop RPC can be configured to + * support different levels of protection. See the Hadoop documentation for more details. Each + * Spark application on YARN gets a different shared secret. + * + * On YARN, the Spark UI gets configured to use the Hadoop YARN AmIpFilter which requires the user + * to go through the ResourceManager Proxy. That proxy is there to reduce the possibility of web + * based attacks through YARN. Hadoop can be configured to use filters to do authentication. That + * authentication then happens via the ResourceManager Proxy and Spark will use that to do + * authorization against the view acls. * * For other Spark deployments, the shared secret must be specified via the * spark.authenticate.secret config. @@ -189,8 +192,7 @@ import org.apache.spark.util.Utils private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder { - // key used to store the spark secret in the Hadoop UGI - private val sparkSecretLookupKey = "sparkCookie" + import SecurityManager._ private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false) // keep spark.ui.acls.enable for backwards compatibility with 1.0 @@ -365,33 +367,38 @@ private[spark] class SecurityManager(sparkConf: SparkConf) * we throw an exception. */ private def generateSecretKey(): String = { - if (!isAuthenticationEnabled) return null - // first check to see if the secret is already set, else generate a new one if on yarn - val sCookie = if (SparkHadoopUtil.get.isYarnMode) { - val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey) - if (secretKey != null) { - logDebug("in yarn mode, getting secret from credentials") - return new Text(secretKey).toString + if (!isAuthenticationEnabled) { + null + } else if (SparkHadoopUtil.get.isYarnMode) { + // In YARN mode, the secure cookie will be created by the driver and stashed in the + // user's credentials, where executors can get it. The check for an array of size 0 + // is because of the test code in YarnSparkHadoopUtilSuite. + val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY) + if (secretKey == null || secretKey.length == 0) { + logDebug("generateSecretKey: yarn mode, secret key from credentials is null") + val rnd = new SecureRandom() + val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE + val secret = new Array[Byte](length) + rnd.nextBytes(secret) + + val cookie = HashCodes.fromBytes(secret).toString() + SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie) + cookie } else { - logDebug("getSecretKey: yarn mode, secret key from credentials is null") + new Text(secretKey).toString } - val cookie = akka.util.Crypt.generateSecureCookie - // if we generated the secret then we must be the first so lets set it so t - // gets used by everyone else - SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie) - logInfo("adding secret to credentials in yarn mode") - cookie } else { // user must have set spark.authenticate.secret config // For Master/Worker, auth secret is in conf; for Executors, it is in env variable - sys.env.get(SecurityManager.ENV_AUTH_SECRET) + Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET)) .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match { case Some(value) => value - case None => throw new Exception("Error: a secret key must be specified via the " + - SecurityManager.SPARK_AUTH_SECRET_CONF + " config") + case None => + throw new IllegalArgumentException( + "Error: a secret key must be specified via the " + + SecurityManager.SPARK_AUTH_SECRET_CONF + " config") } } - sCookie } /** @@ -475,6 +482,9 @@ private[spark] object SecurityManager { val SPARK_AUTH_CONF: String = "spark.authenticate" val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret" // This is used to set auth secret to an executor's env variable. It should have the same - // value as SPARK_AUTH_SECERET_CONF set in SparkConf + // value as SPARK_AUTH_SECRET_CONF set in SparkConf val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET" + + // key used to store the spark secret in the Hadoop UGI + val SECRET_LOOKUP_KEY = "sparkCookie" } diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index f29160d834..26b95c0678 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.io.File -import org.apache.spark.util.Utils +import org.apache.spark.util.{SparkConfWithEnv, Utils} class SecurityManagerSuite extends SparkFunSuite { @@ -223,5 +223,26 @@ class SecurityManagerSuite extends SparkFunSuite { assert(securityManager.hostnameVerifier.isDefined === false) } + test("missing secret authentication key") { + val conf = new SparkConf().set("spark.authenticate", "true") + intercept[IllegalArgumentException] { + new SecurityManager(conf) + } + } + + test("secret authentication key") { + val key = "very secret key" + val conf = new SparkConf() + .set(SecurityManager.SPARK_AUTH_CONF, "true") + .set(SecurityManager.SPARK_AUTH_SECRET_CONF, key) + assert(key === new SecurityManager(conf).getSecretKey()) + + val keyFromEnv = "very secret key from env" + val conf2 = new SparkConfWithEnv(Map(SecurityManager.ENV_AUTH_SECRET -> keyFromEnv)) + .set(SecurityManager.SPARK_AUTH_CONF, "true") + .set(SecurityManager.SPARK_AUTH_SECRET_CONF, key) + assert(keyFromEnv === new SecurityManager(conf2).getSecretKey()) + } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index 86eb41dd7e..8dd31b4b6f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -25,6 +25,7 @@ import scala.io.Source import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.util.SparkConfWithEnv class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { @@ -53,17 +54,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") { val SPARK_PUBLIC_DNS = "public_dns" - class MySparkConf extends SparkConf(false) { - override def getenv(name: String): String = { - if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS - else super.getenv(name) - } - - override def clone: SparkConf = { - new MySparkConf().setAll(getAll) - } - } - val conf = new MySparkConf().set( + val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> SPARK_PUBLIC_DNS)).set( "spark.extraListeners", classOf[SaveExecutorInfo].getName) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala index 15f7ca4a6d..637e78fda0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker import org.apache.spark.{SparkConf, SparkFunSuite} - +import org.apache.spark.util.SparkConfWithEnv class WorkerArgumentsTest extends SparkFunSuite { @@ -34,18 +34,7 @@ class WorkerArgumentsTest extends SparkFunSuite { test("Memory can't be set to 0 when SPARK_WORKER_MEMORY env property leaves off M or G") { val args = Array("spark://localhost:0000 ") - - class MySparkConf extends SparkConf(false) { - override def getenv(name: String): String = { - if (name == "SPARK_WORKER_MEMORY") "50000" - else super.getenv(name) - } - - override def clone: SparkConf = { - new MySparkConf().setAll(getAll) - } - } - val conf = new MySparkConf() + val conf = new SparkConfWithEnv(Map("SPARK_WORKER_MEMORY" -> "50000")) intercept[IllegalStateException] { new WorkerArguments(args, conf) } @@ -53,18 +42,7 @@ class WorkerArgumentsTest extends SparkFunSuite { test("Memory correctly set when SPARK_WORKER_MEMORY env property appends G") { val args = Array("spark://localhost:0000 ") - - class MySparkConf extends SparkConf(false) { - override def getenv(name: String): String = { - if (name == "SPARK_WORKER_MEMORY") "5G" - else super.getenv(name) - } - - override def clone: SparkConf = { - new MySparkConf().setAll(getAll) - } - } - val conf = new MySparkConf() + val conf = new SparkConfWithEnv(Map("SPARK_WORKER_MEMORY" -> "5G")) val workerArgs = new WorkerArguments(args, conf) assert(workerArgs.memory === 5120) } diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala index ac6fec56bb..cc50289c7b 100644 --- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.util.Utils import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} - +import org.apache.spark.util.SparkConfWithEnv /** * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options. @@ -45,20 +45,10 @@ class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter { test("SPARK_LOCAL_DIRS override also affects driver") { // Regression test for SPARK-2975 assert(!new File("/NONEXISTENT_DIR").exists()) - // SPARK_LOCAL_DIRS is a valid directory: - class MySparkConf extends SparkConf(false) { - override def getenv(name: String): String = { - if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir") - else super.getenv(name) - } - - override def clone: SparkConf = { - new MySparkConf().setAll(getAll) - } - } // spark.local.dir only contains invalid directories, but that's not a problem since // SPARK_LOCAL_DIRS will override it on both the driver and workers: - val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH") + val conf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> System.getProperty("java.io.tmpdir"))) + .set("spark.local.dir", "/NONEXISTENT_PATH") assert(new File(Utils.getLocalDir(conf)).exists()) } diff --git a/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala new file mode 100644 index 0000000000..ddd5edf4f7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SparkConf + +/** + * Customized SparkConf that allows env variables to be overridden. + */ +class SparkConfWithEnv(env: Map[String, String]) extends SparkConf(false) { + override def getenv(name: String): String = { + env.get(name).getOrElse(super.getenv(name)) + } + + override def clone: SparkConf = { + new SparkConfWithEnv(env).setAll(getAll) + } + +} -- cgit v1.2.3