aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala72
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala34
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala32
8 files changed, 138 insertions, 83 deletions
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 746d2081d4..64e483e384 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -17,11 +17,13 @@
package org.apache.spark
+import java.lang.{Byte => JByte}
import java.net.{Authenticator, PasswordAuthentication}
-import java.security.KeyStore
+import java.security.{KeyStore, SecureRandom}
import java.security.cert.X509Certificate
import javax.net.ssl._
+import com.google.common.hash.HashCodes
import com.google.common.io.Files
import org.apache.hadoop.io.Text
@@ -130,15 +132,16 @@ import org.apache.spark.util.Utils
*
* The exact mechanisms used to generate/distribute the shared secret are deployment-specific.
*
- * For Yarn deployments, the secret is automatically generated using the Akka remote
- * Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
- * around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
- * of protection. See the Hadoop documentation for more details. Each Spark application on Yarn
- * gets a different shared secret. On Yarn, the Spark UI gets configured to use the Hadoop Yarn
- * AmIpFilter which requires the user to go through the ResourceManager Proxy. That Proxy is there
- * to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
- * filters to do authentication. That authentication then happens via the ResourceManager Proxy
- * and Spark will use that to do authorization against the view acls.
+ * For YARN deployments, the secret is automatically generated. The secret is placed in the Hadoop
+ * UGI which gets passed around via the Hadoop RPC mechanism. Hadoop RPC can be configured to
+ * support different levels of protection. See the Hadoop documentation for more details. Each
+ * Spark application on YARN gets a different shared secret.
+ *
+ * On YARN, the Spark UI gets configured to use the Hadoop YARN AmIpFilter which requires the user
+ * to go through the ResourceManager Proxy. That proxy is there to reduce the possibility of web
+ * based attacks through YARN. Hadoop can be configured to use filters to do authentication. That
+ * authentication then happens via the ResourceManager Proxy and Spark will use that to do
+ * authorization against the view acls.
*
* For other Spark deployments, the shared secret must be specified via the
* spark.authenticate.secret config.
@@ -189,8 +192,7 @@ import org.apache.spark.util.Utils
private[spark] class SecurityManager(sparkConf: SparkConf)
extends Logging with SecretKeyHolder {
- // key used to store the spark secret in the Hadoop UGI
- private val sparkSecretLookupKey = "sparkCookie"
+ import SecurityManager._
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
@@ -365,33 +367,38 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
* we throw an exception.
*/
private def generateSecretKey(): String = {
- if (!isAuthenticationEnabled) return null
- // first check to see if the secret is already set, else generate a new one if on yarn
- val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
- val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
- if (secretKey != null) {
- logDebug("in yarn mode, getting secret from credentials")
- return new Text(secretKey).toString
+ if (!isAuthenticationEnabled) {
+ null
+ } else if (SparkHadoopUtil.get.isYarnMode) {
+ // In YARN mode, the secure cookie will be created by the driver and stashed in the
+ // user's credentials, where executors can get it. The check for an array of size 0
+ // is because of the test code in YarnSparkHadoopUtilSuite.
+ val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
+ if (secretKey == null || secretKey.length == 0) {
+ logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
+ val rnd = new SecureRandom()
+ val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
+ val secret = new Array[Byte](length)
+ rnd.nextBytes(secret)
+
+ val cookie = HashCodes.fromBytes(secret).toString()
+ SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
+ cookie
} else {
- logDebug("getSecretKey: yarn mode, secret key from credentials is null")
+ new Text(secretKey).toString
}
- val cookie = akka.util.Crypt.generateSecureCookie
- // if we generated the secret then we must be the first so lets set it so t
- // gets used by everyone else
- SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie)
- logInfo("adding secret to credentials in yarn mode")
- cookie
} else {
// user must have set spark.authenticate.secret config
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
- sys.env.get(SecurityManager.ENV_AUTH_SECRET)
+ Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
- case None => throw new Exception("Error: a secret key must be specified via the " +
- SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
+ case None =>
+ throw new IllegalArgumentException(
+ "Error: a secret key must be specified via the " +
+ SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
- sCookie
}
/**
@@ -475,6 +482,9 @@ private[spark] object SecurityManager {
val SPARK_AUTH_CONF: String = "spark.authenticate"
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
// This is used to set auth secret to an executor's env variable. It should have the same
- // value as SPARK_AUTH_SECERET_CONF set in SparkConf
+ // value as SPARK_AUTH_SECRET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
+
+ // key used to store the spark secret in the Hadoop UGI
+ val SECRET_LOOKUP_KEY = "sparkCookie"
}
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index f29160d834..26b95c0678 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.io.File
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
class SecurityManagerSuite extends SparkFunSuite {
@@ -223,5 +223,26 @@ class SecurityManagerSuite extends SparkFunSuite {
assert(securityManager.hostnameVerifier.isDefined === false)
}
+ test("missing secret authentication key") {
+ val conf = new SparkConf().set("spark.authenticate", "true")
+ intercept[IllegalArgumentException] {
+ new SecurityManager(conf)
+ }
+ }
+
+ test("secret authentication key") {
+ val key = "very secret key"
+ val conf = new SparkConf()
+ .set(SecurityManager.SPARK_AUTH_CONF, "true")
+ .set(SecurityManager.SPARK_AUTH_SECRET_CONF, key)
+ assert(key === new SecurityManager(conf).getSecretKey())
+
+ val keyFromEnv = "very secret key from env"
+ val conf2 = new SparkConfWithEnv(Map(SecurityManager.ENV_AUTH_SECRET -> keyFromEnv))
+ .set(SecurityManager.SPARK_AUTH_CONF, "true")
+ .set(SecurityManager.SPARK_AUTH_SECRET_CONF, key)
+ assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 86eb41dd7e..8dd31b4b6f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -25,6 +25,7 @@ import scala.io.Source
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.util.SparkConfWithEnv
class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
@@ -53,17 +54,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
- class MySparkConf extends SparkConf(false) {
- override def getenv(name: String): String = {
- if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
- else super.getenv(name)
- }
-
- override def clone: SparkConf = {
- new MySparkConf().setAll(getAll)
- }
- }
- val conf = new MySparkConf().set(
+ val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> SPARK_PUBLIC_DNS)).set(
"spark.extraListeners", classOf[SaveExecutorInfo].getName)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
index 15f7ca4a6d..637e78fda0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -19,7 +19,7 @@
package org.apache.spark.deploy.worker
import org.apache.spark.{SparkConf, SparkFunSuite}
-
+import org.apache.spark.util.SparkConfWithEnv
class WorkerArgumentsTest extends SparkFunSuite {
@@ -34,18 +34,7 @@ class WorkerArgumentsTest extends SparkFunSuite {
test("Memory can't be set to 0 when SPARK_WORKER_MEMORY env property leaves off M or G") {
val args = Array("spark://localhost:0000 ")
-
- class MySparkConf extends SparkConf(false) {
- override def getenv(name: String): String = {
- if (name == "SPARK_WORKER_MEMORY") "50000"
- else super.getenv(name)
- }
-
- override def clone: SparkConf = {
- new MySparkConf().setAll(getAll)
- }
- }
- val conf = new MySparkConf()
+ val conf = new SparkConfWithEnv(Map("SPARK_WORKER_MEMORY" -> "50000"))
intercept[IllegalStateException] {
new WorkerArguments(args, conf)
}
@@ -53,18 +42,7 @@ class WorkerArgumentsTest extends SparkFunSuite {
test("Memory correctly set when SPARK_WORKER_MEMORY env property appends G") {
val args = Array("spark://localhost:0000 ")
-
- class MySparkConf extends SparkConf(false) {
- override def getenv(name: String): String = {
- if (name == "SPARK_WORKER_MEMORY") "5G"
- else super.getenv(name)
- }
-
- override def clone: SparkConf = {
- new MySparkConf().setAll(getAll)
- }
- }
- val conf = new MySparkConf()
+ val conf = new SparkConfWithEnv(Map("SPARK_WORKER_MEMORY" -> "5G"))
val workerArgs = new WorkerArguments(args, conf)
assert(workerArgs.memory === 5120)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index ac6fec56bb..cc50289c7b 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.util.Utils
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
-
+import org.apache.spark.util.SparkConfWithEnv
/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
@@ -45,20 +45,10 @@ class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter {
test("SPARK_LOCAL_DIRS override also affects driver") {
// Regression test for SPARK-2975
assert(!new File("/NONEXISTENT_DIR").exists())
- // SPARK_LOCAL_DIRS is a valid directory:
- class MySparkConf extends SparkConf(false) {
- override def getenv(name: String): String = {
- if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
- else super.getenv(name)
- }
-
- override def clone: SparkConf = {
- new MySparkConf().setAll(getAll)
- }
- }
// spark.local.dir only contains invalid directories, but that's not a problem since
// SPARK_LOCAL_DIRS will override it on both the driver and workers:
- val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
+ val conf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> System.getProperty("java.io.tmpdir")))
+ .set("spark.local.dir", "/NONEXISTENT_PATH")
assert(new File(Utils.getLocalDir(conf)).exists())
}
diff --git a/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
new file mode 100644
index 0000000000..ddd5edf4f7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkConf
+
+/**
+ * Customized SparkConf that allows env variables to be overridden.
+ */
+class SparkConfWithEnv(env: Map[String, String]) extends SparkConf(false) {
+ override def getenv(name: String): String = {
+ env.get(name).getOrElse(super.getenv(name))
+ }
+
+ override def clone: SparkConf = {
+ new SparkConfWithEnv(env).setAll(getAll)
+ }
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5924daf3ec..561ad79ee0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -81,7 +82,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
override def addSecretKeyToUserCredentials(key: String, secret: String) {
val creds = new Credentials()
- creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
+ creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
addCurrentUserCredentials(creds)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 9132c56a91..a70e66d39a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -24,8 +24,10 @@ import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.hadoop.io.Text
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
@@ -263,7 +265,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
})
}
- def assertNestedHiveException(e: InvocationTargetException): Throwable = {
+ private def assertNestedHiveException(e: InvocationTargetException): Throwable = {
val inner = e.getCause
if (inner == null) {
fail("No inner cause", e)
@@ -274,4 +276,32 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
inner
}
+ // This test needs to live here because it depends on isYarnMode returning true, which can only
+ // happen in the YARN module.
+ test("security manager token generation") {
+ try {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ val initial = SparkHadoopUtil.get
+ .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
+ assert(initial === null || initial.length === 0)
+
+ val conf = new SparkConf()
+ .set(SecurityManager.SPARK_AUTH_CONF, "true")
+ .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
+ val sm = new SecurityManager(conf)
+
+ val generated = SparkHadoopUtil.get
+ .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
+ assert(generated != null)
+ val genString = new Text(generated).toString()
+ assert(genString != "unused")
+ assert(sm.getSecretKey() === genString)
+ } finally {
+ // removeSecretKey() was only added in Hadoop 2.6, so instead we just set the secret
+ // to an empty string.
+ SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY, "")
+ System.clearProperty("SPARK_YARN_MODE")
+ }
+ }
+
}