aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
2 files changed, 6 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 47fe743880..4a0df35eb9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -333,7 +333,11 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
- new HadoopDatasetRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ // Add necessary security credentials to the JobConf before broadcasting it.
+ SparkEnv.get.hadoop.addCredentials(conf)
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
+ val confBroadcast = broadcast(new SerializableWritable(conf))
+ new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 1ae8e41162..dd9fc7b79a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -69,19 +69,13 @@ class HadoopFileRDD[K, V](
*/
class HadoopDatasetRDD[K, V](
sc: SparkContext,
- @transient conf: JobConf,
+ confBroadcast: Broadcast[SerializableWritable[JobConf]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
- // Add necessary security credentials to the JobConf before broadcasting it.
- SparkEnv.get.hadoop.addCredentials(conf)
-
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
-
override def getJobConf(): JobConf = confBroadcast.value.value
}