aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorThomas Graves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-06-19 11:18:42 -0500
committerThomas Graves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-06-19 11:18:42 -0500
commit75d78c7ac90ce717bf1009ec4d335fb4a6cfde24 (patch)
tree56c92d7a2a667c5e77eec864efe4373a8dcea728 /streaming
parent84530ba6d9fa47ee2863bb50c23742ecfa2a6a64 (diff)
downloadspark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.tar.gz
spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.tar.bz2
spark-75d78c7ac90ce717bf1009ec4d335fb4a6cfde24.zip
Add support for Spark on Yarn on a secure Hadoop cluster
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3ec922957d..20ee1d3c5d 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -14,6 +14,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
@@ -470,7 +471,11 @@ extends Serializable {
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
- ) {
+ ) {
+ // make sure to propogate any credentials from the current user to the jobConf
+ // for Hadoop security
+ val jobCreds = conf.getCredentials();
+ jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)