diff options
author | Thomas Graves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-06-19 11:18:42 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-06-19 11:18:42 -0500 |
commit | 75d78c7ac90ce717bf1009ec4d335fb4a6cfde24 (patch) | |
tree | 56c92d7a2a667c5e77eec864efe4373a8dcea728 /streaming/src | |
parent | 84530ba6d9fa47ee2863bb50c23742ecfa2a6a64 (diff) | |
download | spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.tar.gz spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.tar.bz2 spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.zip |
Add support for Spark on Yarn on a secure Hadoop cluster
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3ec922957d..20ee1d3c5d 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -14,6 +14,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) @@ -470,7 +471,11 @@ extends Serializable { valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf - ) { + ) { + // make sure to propogate any credentials from the current user to the jobConf + // for Hadoop security + val jobCreds = conf.getCredentials(); + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) |