aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 146609ae39..7a11978304 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
@@ -93,7 +94,13 @@ class NewHadoopRDD[K, V](
// issues, this cloning is disabled by default.
NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
- new Configuration(conf)
+ // The Configuration passed in is actually a JobConf and possibly contains credentials.
+ // To keep those credentials properly we have to create a new JobConf not a Configuration.
+ if (conf.isInstanceOf[JobConf]) {
+ new JobConf(conf)
+ } else {
+ new Configuration(conf)
+ }
}
} else {
conf