aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-12 21:23:26 -0700
committerReynold Xin <rxin@apache.org>2013-10-12 21:23:26 -0700
commit99796904ae9d00405584ac518b6144973a876e3c (patch)
treef99ab86d42221ce6d1d47ba071374837bf1bdeda
parentdca80094d317363e1e0d7e32bc7dfd99faf943cf (diff)
parent6c32aab87d4ea26086536fb988744d0d30792160 (diff)
downloadspark-99796904ae9d00405584ac518b6144973a876e3c.tar.gz
spark-99796904ae9d00405584ac518b6144973a876e3c.tar.bz2
spark-99796904ae9d00405584ac518b6144973a876e3c.zip
Merge pull request #52 from harveyfeng/hadoop-closure
Add an optional closure parameter to HadoopRDD instantiation to use when creating local JobConfs. Having HadoopRDD accept this optional closure eliminates the need for the HadoopFileRDD added earlier. It makes the HadoopRDD more general, in that the caller can specify any JobConf initialization flow.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala52
2 files changed, 26 insertions, 55 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b884ae5879..b7a8179786 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -335,7 +335,7 @@ class SparkContext(
}
/**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
@@ -361,24 +361,15 @@ class SparkContext(
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
- hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
- }
-
- /**
- * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
- * that has already been broadcast, assuming that it's safe to use it to construct a
- * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
- */
- def hadoopFile[K, V](
- path: String,
- confBroadcast: Broadcast[SerializableWritable[Configuration]],
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int
- ): RDD[(K, V)] = {
- new HadoopFileRDD(
- this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
+ new HadoopRDD(
+ this,
+ confBroadcast,
+ Some(setInputPathsFunc),
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minSplits)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index d3b3fffd40..2d394abfd9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -33,41 +33,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
-/**
- * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
- * system, or S3).
- * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
- * across multiple reads; the 'path' is the only variable that is different across new JobConfs
- * created from the Configuration.
- */
-class HadoopFileRDD[K, V](
- sc: SparkContext,
- path: String,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int)
- extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
-
- override def getJobConf(): JobConf = {
- if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
- // getJobConf() has been called previously, so there is already a local cache of the JobConf
- // needed by this RDD.
- return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
- } else {
- // Create a new JobConf, set the input file/directory paths to read from, and cache the
- // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
- // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
- // getJobConf() calls for this RDD in the local process.
- // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- val newJobConf = new JobConf(broadcastedConf.value.value)
- FileInputFormat.setInputPaths(newJobConf, path)
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
- return newJobConf
- }
- }
-}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -83,11 +48,24 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
- * A base class that provides core functionality for reading data partitions stored in Hadoop.
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
+ * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
+ * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
+ * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
+ * creates.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
@@ -105,6 +83,7 @@ class HadoopRDD[K, V](
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
valueClass,
@@ -130,6 +109,7 @@ class HadoopRDD[K, V](
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}