aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHarvey <h.feng@berkeley.edu>2013-09-22 14:43:58 -0700
committerHarvey <h.feng@berkeley.edu>2013-09-22 14:43:58 -0700
commitef34cfb26cffefac4c39c5a6006b836f1e325d09 (patch)
tree8a2879da21f38f3e58c5470e35e076a039fc57f8 /core
parenta6eeb5ffd54956667ec4e793149fdab90041ad6c (diff)
downloadspark-ef34cfb26cffefac4c39c5a6006b836f1e325d09.tar.gz
spark-ef34cfb26cffefac4c39c5a6006b836f1e325d09.tar.bz2
spark-ef34cfb26cffefac4c39c5a6006b836f1e325d09.zip
Move Configuration broadcasts to SparkContext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
2 files changed, 6 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 47fe743880..4a0df35eb9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -333,7 +333,11 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
- new HadoopDatasetRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ // Add necessary security credentials to the JobConf before broadcasting it.
+ SparkEnv.get.hadoop.addCredentials(conf)
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
+ val confBroadcast = broadcast(new SerializableWritable(conf))
+ new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 1ae8e41162..dd9fc7b79a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -69,19 +69,13 @@ class HadoopFileRDD[K, V](
*/
class HadoopDatasetRDD[K, V](
sc: SparkContext,
- @transient conf: JobConf,
+ confBroadcast: Broadcast[SerializableWritable[JobConf]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
- // Add necessary security credentials to the JobConf before broadcasting it.
- SparkEnv.get.hadoop.addCredentials(conf)
-
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
-
override def getJobConf(): JobConf = confBroadcast.value.value
}