From ef34cfb26cffefac4c39c5a6006b836f1e325d09 Mon Sep 17 00:00:00 2001 From: Harvey Date: Sun, 22 Sep 2013 14:43:58 -0700 Subject: Move Configuration broadcasts to SparkContext. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++++- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +------- 2 files changed, 6 insertions(+), 8 deletions(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 47fe743880..4a0df35eb9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -333,7 +333,11 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { - new HadoopDatasetRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + // Add necessary security credentials to the JobConf before broadcasting it. + SparkEnv.get.hadoop.addCredentials(conf) + // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it. + val confBroadcast = broadcast(new SerializableWritable(conf)) + new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 1ae8e41162..dd9fc7b79a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -69,19 +69,13 @@ class HadoopFileRDD[K, V]( */ class HadoopDatasetRDD[K, V]( sc: SparkContext, - @transient conf: JobConf, + confBroadcast: Broadcast[SerializableWritable[JobConf]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int) extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) { - // Add necessary security credentials to the JobConf before broadcasting it. - SparkEnv.get.hadoop.addCredentials(conf) - - // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it. - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - override def getJobConf(): JobConf = confBroadcast.value.value } -- cgit v1.2.3