aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md7
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala56
-rw-r--r--yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala55
3 files changed, 101 insertions, 17 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 0362f5a223..573930dbf4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -106,6 +106,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
set this configuration to "hdfs:///some/path".
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.access.namenodes</code></td>
+ <td>(none)</td>
+ <td>
+ A list of secure HDFS namenodes your Spark application is going to access. For example, `spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. The Spark application must have acess to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index b7e8636e02..ed8f56ab8b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -191,23 +191,11 @@ trait ClientBase extends Logging {
// Upload Spark and the application JAR to the remote file system if necessary. Add them as
// local resources to the application master.
val fs = FileSystem.get(conf)
-
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- if (UserGroupInformation.isSecurityEnabled()) {
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
- }
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
- val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
-
- if (UserGroupInformation.isSecurityEnabled()) {
- val dstFs = dst.getFileSystem(conf)
- dstFs.addDelegationTokens(delegTokenRenewer, credentials)
- }
+ val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
+ ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
+ val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
@@ -614,4 +602,40 @@ object ClientBase extends Logging {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
File.pathSeparator)
+ /**
+ * Get the list of namenodes the user may access.
+ */
+ private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+ sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
+ .map(new Path(_)).toSet
+ }
+
+ private[yarn] def getTokenRenewer(conf: Configuration): String = {
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ logDebug("delegation token renewer is: " + delegTokenRenewer)
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+ logError(errorMessage)
+ throw new SparkException(errorMessage)
+ }
+ delegTokenRenewer
+ }
+
+ /**
+ * Obtains tokens for the namenodes passed in and adds them to the credentials.
+ */
+ private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
+ creds: Credentials) {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val delegTokenRenewer = getTokenRenewer(conf)
+
+ paths.foreach {
+ dst =>
+ val dstFs = dst.getFileSystem(conf)
+ logDebug("getting token for namenode: " + dst)
+ dstFs.addDelegationTokens(delegTokenRenewer, creds)
+ }
+ }
+ }
+
}
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index 686714dc36..68cc2890f3 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Matchers._
import org.mockito.Mockito._
+
+
import org.scalatest.FunSuite
import org.scalatest.Matchers
@@ -38,7 +40,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap => MutableHashMap }
import scala.util.Try
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils
class ClientBaseSuite extends FunSuite with Matchers {
@@ -138,6 +140,57 @@ class ClientBaseSuite extends FunSuite with Matchers {
}
}
+ test("check access nns empty") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns unset") {
+ val sparkConf = new SparkConf()
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access nns space") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access two nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val renewer = ClientBase.getTokenRenewer(hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val caught =
+ intercept[SparkException] {
+ ClientBase.getTokenRenewer(hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =