From 75d78c7ac90ce717bf1009ec4d335fb4a6cfde24 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 19 Jun 2013 11:18:42 -0500 Subject: Add support for Spark on Yarn on a secure Hadoop cluster --- .../src/main/scala/spark/streaming/PairDStreamFunctions.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3ec922957d..20ee1d3c5d 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -14,6 +14,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) @@ -470,7 +471,11 @@ extends Serializable { valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf - ) { + ) { + // make sure to propogate any credentials from the current user to the jobConf + // for Hadoop security + val jobCreds = conf.getCredentials(); + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) -- cgit v1.2.3