aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2015-02-02 22:45:55 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-02 22:45:55 -0800
commitc31c36c4a76bd3449696383321332ec95bff7fed (patch)
tree46b1d56b1c17073ae817a0a6ffa2fc155613edcc /core
parenteb0da6c4bd55aaab972c53eb934e68257b8994e5 (diff)
downloadspark-c31c36c4a76bd3449696383321332ec95bff7fed.tar.gz
spark-c31c36c4a76bd3449696383321332ec95bff7fed.tar.bz2
spark-c31c36c4a76bd3449696383321332ec95bff7fed.zip
[SPARK-3778] newAPIHadoopRDD doesn't properly pass credentials for secure hdfs
.this was https://github.com/apache/spark/pull/2676 https://issues.apache.org/jira/browse/SPARK-3778 This affects if someone is trying to access secure hdfs something like: val lines = { val hconf = new Configuration() hconf.set("mapred.input.dir", "mydir") hconf.set("textinputformat.record.delimiter","\003432\n") sc.newAPIHadoopRDD(hconf, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) } Author: Thomas Graves <tgraves@apache.org> Closes #4292 from tgravescs/SPARK-3788 and squashes the following commits: cf3b453 [Thomas Graves] newAPIHadoopRDD doesn't properly pass credentials for secure hdfs on yarn
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 228076f01c..6a16a31654 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -804,6 +804,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
assertNotStopped()
+ // The call to new NewHadoopJob automatically adds security credentials to conf,
+ // so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
@@ -826,7 +828,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
- new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ // Add necessary security credentials to the JobConf. Required to access secure HDFS.
+ val jconf = new JobConf(conf)
+ SparkHadoopUtil.get.addCredentials(jconf)
+ new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types.