aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala140
1 files changed, 117 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734e41..d3b3fffd40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import java.io.EOFException
+import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+ TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+ sc: SparkContext,
+ path: String,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int)
+ extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
+
+ override def getJobConf(): JobConf = {
+ if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ // getJobConf() has been called previously, so there is already a local cache of the JobConf
+ // needed by this RDD.
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a new JobConf, set the input file/directory paths to read from, and cache the
+ // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
+ // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
+ // getJobConf() calls for this RDD in the local process.
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ val newJobConf = new JobConf(broadcastedConf.value.value)
+ FileInputFormat.setInputPaths(newJobConf, path)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
+}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions stored in Hadoop.
*/
class HadoopRDD[K, V](
sc: SparkContext,
- @transient conf: JobConf,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ def this(
+ sc: SparkContext,
+ conf: JobConf,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableWritable(conf))
+ .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minSplits)
+ }
+
+ protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+ protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+ // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+ protected def getJobConf(): JobConf = {
+ val conf: Configuration = broadcastedConf.value.value
+ if (conf.isInstanceOf[JobConf]) {
+ // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
+ return conf.asInstanceOf[JobConf]
+ } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ // getJobConf() has been called previously, so there is already a local cache of the JobConf
+ // needed by this RDD.
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ val newJobConf = new JobConf(broadcastedConf.value.value)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
+
+ protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+ if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+ return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+ }
+ // Once an InputFormat for this RDD is created, cache it so that only one reflection call is
+ // done in each local process.
+ val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[K, V]]
+ if (newInputFormat.isInstanceOf[Configurable]) {
+ newInputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
+ HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+ return newInputFormat
+ }
override def getPartitions: Array[Partition] = {
- val env = SparkEnv.get
- env.hadoop.addCredentials(conf)
- val inputFormat = createInputFormat(conf)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
- inputFormat.asInstanceOf[Configurable].setConf(conf)
+ inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
- val inputSplits = inputFormat.getSplits(conf, minSplits)
+ val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +164,14 @@ class HadoopRDD[K, V](
array
}
- def createInputFormat(conf: JobConf): InputFormat[K, V] = {
- ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
- .asInstanceOf[InputFormat[K, V]]
- }
-
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
- val conf = confBroadcast.value.value
- val fmt = createInputFormat(conf)
- if (fmt.isInstanceOf[Configurable]) {
- fmt.asInstanceOf[Configurable].setConf(conf)
- }
- reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
+ reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +208,18 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}
- def getConf: Configuration = confBroadcast.value.value
+ def getConf: Configuration = getJobConf()
+}
+
+private[spark] object HadoopRDD {
+ /**
+ * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
+ * the local process.
+ */
+ def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+ def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+ def putCachedMetadata(key: String, value: Any) =
+ SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
}