diff options
authorHarvey Feng <harvey@databricks.com>2013-10-05 17:39:17 -0700
committerHarvey Feng <harvey@databricks.com>2013-10-05 17:53:58 -0700
commit6a2bbec5e3840cea5c128d521fe91050de8689db (patch)
parent96929f28bb9c929ca3309dbe99910097f5eb3c8c (diff)
Some comments regarding JobConf and InputFormat caching for HadoopRDDs.
2 files changed, 25 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index f416b95afb..993ba6bd3d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -27,14 +27,16 @@ import org.apache.hadoop.mapred.JobConf
* Contains util methods to interact with Hadoop from spark.
class SparkHadoopUtil {
- // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses
- // this to cache JobConfs).
+ // A general, soft-reference map for metadata needed during HadoopRDD split computation
+ // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
+ // subsystems
def newConfiguration(): Configuration = new Configuration()
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+ // cluster
def addCredentials(conf: JobConf) {}
def isYarnMode(): Boolean = { false }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 51e5bb88d2..d3b3fffd40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -52,8 +52,15 @@ class HadoopFileRDD[K, V](
override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ // getJobConf() has been called previously, so there is already a local cache of the JobConf
+ // needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
+ // Create a new JobConf, set the input file/directory paths to read from, and cache the
+ // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
+ // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
+ // getJobConf() calls for this RDD in the local process.
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
@@ -112,10 +119,16 @@ class HadoopRDD[K, V](
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
+ // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
return conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ // getJobConf() has been called previously, so there is already a local cache of the JobConf
+ // needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
@@ -126,6 +139,8 @@ class HadoopRDD[K, V](
if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+ // Once an InputFormat for this RDD is created, cache it so that only one reflection call is
+ // done in each local process.
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
if (newInputFormat.isInstanceOf[Configurable]) {
@@ -197,6 +212,10 @@ class HadoopRDD[K, V](
private[spark] object HadoopRDD {
+ /**
+ * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
+ * the local process.
+ */
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)