aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala51
-rw-r--r--docs/running-on-yarn.md96
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala3
3 files changed, 148 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 2e9e45a155..7a5fc866bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,10 +17,11 @@
package org.apache.spark.deploy
-import java.io.{ByteArrayInputStream, DataInputStream}
+import java.io.{ByteArrayInputStream, DataInputStream, IOException}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
-import java.util.{Arrays, Comparator}
+import java.text.DateFormat
+import java.util.{Arrays, Comparator, Date}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
@@ -34,6 +35,8 @@ import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
@@ -357,6 +360,50 @@ class SparkHadoopUtil extends Logging {
newConf.setBoolean(confKey, true)
newConf
}
+
+ /**
+ * Dump the credentials' tokens to string values.
+ *
+ * @param credentials credentials
+ * @return an iterator over the string values. If no credentials are passed in: an empty list
+ */
+ private[spark] def dumpTokens(credentials: Credentials): Iterable[String] = {
+ if (credentials != null) {
+ credentials.getAllTokens.asScala.map(tokenToString)
+ } else {
+ Seq()
+ }
+ }
+
+ /**
+ * Convert a token to a string for logging.
+ * If its an abstract delegation token, attempt to unmarshall it and then
+ * print more details, including timestamps in human-readable form.
+ *
+ * @param token token to convert to a string
+ * @return a printable string value.
+ */
+ private[spark] def tokenToString(token: Token[_ <: TokenIdentifier]): String = {
+ val df = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.SHORT)
+ val buffer = new StringBuilder(128)
+ buffer.append(token.toString)
+ try {
+ val ti = token.decodeIdentifier
+ buffer.append("; ").append(ti)
+ ti match {
+ case dt: AbstractDelegationTokenIdentifier =>
+ // include human times and the renewer, which the HDFS tokens toString omits
+ buffer.append("; Renewer: ").append(dt.getRenewer)
+ buffer.append("; Issued: ").append(df.format(new Date(dt.getIssueDate)))
+ buffer.append("; Max Date: ").append(df.format(new Date(dt.getMaxDate)))
+ case _ =>
+ }
+ } catch {
+ case e: IOException =>
+ logDebug("Failed to decode $token: $e", e)
+ }
+ buffer.toString
+ }
}
object SparkHadoopUtil {
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 3bd16bf60c..f2fbe3ca56 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -476,3 +476,99 @@ If you need a reference to the proper location to put log files in the YARN so t
- In `cluster` mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored. In `client` mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in `spark.local.dir`. This is because the Spark driver does not run on the YARN cluster in `client` mode, only the Spark executors do.
- The `--files` and `--archives` options support specifying file names with the # similar to Hadoop. For example you can specify: `--files localtest.txt#appSees.txt` and this will upload the file you have locally named `localtest.txt` into HDFS but this will be linked to by the name `appSees.txt`, and your application should use the name as `appSees.txt` to reference it when running on YARN.
- The `--jars` option allows the `SparkContext.addJar` function to work if you are using it with local files and running in `cluster` mode. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
+
+# Running in a Secure Cluster
+
+As covered in [security](security.html), Kerberos is used in a secure Hadoop cluster to
+authenticate principals associated with services and clients. This allows clients to
+make requests of these authenticated services; the services to grant rights
+to the authenticated principals.
+
+Hadoop services issue *hadoop tokens* to grant access to the services and data.
+Clients must first acquire tokens for the services they will access and pass them along with their
+application as it is launched in the YARN cluster.
+
+For a Spark application to interact with HDFS, HBase and Hive, it must acquire the relevant tokens
+using the Kerberos credentials of the user launching the application
+—that is, the principal whose identity will become that of the launched Spark application.
+
+This is normally done at launch time: in a secure cluster Spark will automatically obtain a
+token for the cluster's HDFS filesystem, and potentially for HBase and Hive.
+
+An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
+the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
+and `spark.yarn.security.tokens.hbase.enabled` is not set to `false`.
+
+Similarly, a Hive token will be obtained if Hive is on the classpath, its configuration
+includes a URI of the metadata store in `"hive.metastore.uris`, and
+`spark.yarn.security.tokens.hive.enabled` is not set to `false`.
+
+If an application needs to interact with other secure HDFS clusters, then
+the tokens needed to access these clusters must be explicitly requested at
+launch time. This is done by listing them in the `spark.yarn.access.namenodes` property.
+
+```
+spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
+```
+
+## Launching your application with Apache Oozie
+
+Apache Oozie can launch Spark applications as part of a workflow.
+In a secure cluster, the launched application will need the relevant tokens to access the cluster's
+services. If Spark is launched with a keytab, this is automatic.
+However, if Spark is to be launched without a keytab, the responsibility for setting up security
+must be handed over to Oozie.
+
+The details of configuring Oozie for secure clusters and obtaining
+credentials for a job can be found on the [Oozie web site](http://oozie.apache.org/)
+in the "Authentication" section of the specific release's documentation.
+
+For Spark applications, the Oozie workflow must be set up for Oozie to request all tokens which
+the application needs, including:
+
+- The YARN resource manager.
+- The local HDFS filesystem.
+- Any remote HDFS filesystems used as a source or destination of I/O.
+- Hive —if used.
+- HBase —if used.
+- The YARN timeline server, if the application interacts with this.
+
+To avoid Spark attempting —and then failing— to obtain Hive, HBase and remote HDFS tokens,
+the Spark configuration must be set to disable token collection for the services.
+
+The Spark configuration must include the lines:
+
+```
+spark.yarn.security.tokens.hive.enabled false
+spark.yarn.security.tokens.hbase.enabled false
+```
+
+The configuration option `spark.yarn.access.namenodes` must be unset.
+
+## Troubleshooting Kerberos
+
+Debugging Hadoop/Kerberos problems can be "difficult". One useful technique is to
+enable extra logging of Kerberos operations in Hadoop by setting the `HADOOP_JAAS_DEBUG`
+environment variable.
+
+```bash
+export HADOOP_JAAS_DEBUG=true
+```
+
+The JDK classes can be configured to enable extra logging of their Kerberos and
+SPNEGO/REST authentication via the system properties `sun.security.krb5.debug`
+and `sun.security.spnego.debug=true`
+
+```
+-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
+```
+
+All these options can be enabled in the Application Master:
+
+```
+spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true
+spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
+```
+
+Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set to `DEBUG`, the log
+will include a list of all tokens obtained, and their expiry details
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a12391d081..598eb17d5d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -375,6 +375,9 @@ private[spark] class Client(
val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
+ if (credentials != null) {
+ logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
+ }
val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
.getOrElse(fs.getDefaultReplication(destDir))