aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHarvey Feng <harvey@databricks.com>2013-10-10 16:33:56 -0700
committerHarvey Feng <harvey@databricks.com>2013-10-10 16:35:52 -0700
commit5a99e67894e3a7716a8a1eaa19ce17e7a8abf5b6 (patch)
tree2f80aaf70b351f2e6f275fcba238fe198abc38c9 /core
parent02f37ee8530bcd5afefe403147bbf411464fa773 (diff)
downloadspark-5a99e67894e3a7716a8a1eaa19ce17e7a8abf5b6.tar.gz
spark-5a99e67894e3a7716a8a1eaa19ce17e7a8abf5b6.tar.bz2
spark-5a99e67894e3a7716a8a1eaa19ce17e7a8abf5b6.zip
Add an optional closure parameter to HadoopRDD instantiation to used when creating any local JobConfs.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala52
2 files changed, 48 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index febcf9c6ee..d9c6264855 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -330,7 +330,7 @@ class SparkContext(
}
/**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
@@ -346,34 +346,49 @@ class SparkContext(
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V](
+ /**
+ * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+ * that has already been broadcast and use it to construct JobConfs local to each process. These
+ * JobConfs will be initialized using an optional, user-specified closure.
+ */
+ def hadoopRDD[K, V](
path: String,
+ confBroadcast: Broadcast[SerializableWritable[Configuration]],
+ initLocalJobConfOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
- ): RDD[(K, V)] = {
- // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
- val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
- hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ minSplits: Int
+ ): RDD[(K, V)] = {
+ new HadoopRDD(
+ this,
+ confBroadcast,
+ initLocalJobConfOpt,
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minSplits)
}
- /**
- * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
- * that has already been broadcast, assuming that it's safe to use it to construct a
- * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
- */
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V](
path: String,
- confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
- new HadoopFileRDD(
- this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
+ val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+ val setInputPathsFunc = Some((jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path))
+ new HadoopRDD(
+ this,
+ confBroadcast,
+ setInputPathsFunc,
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minSplits)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index d3b3fffd40..4ecdd65e9b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -33,41 +33,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
-/**
- * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
- * system, or S3).
- * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
- * across multiple reads; the 'path' is the only variable that is different across new JobConfs
- * created from the Configuration.
- */
-class HadoopFileRDD[K, V](
- sc: SparkContext,
- path: String,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int)
- extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
-
- override def getJobConf(): JobConf = {
- if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
- // getJobConf() has been called previously, so there is already a local cache of the JobConf
- // needed by this RDD.
- return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
- } else {
- // Create a new JobConf, set the input file/directory paths to read from, and cache the
- // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
- // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
- // getJobConf() calls for this RDD in the local process.
- // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
- val newJobConf = new JobConf(broadcastedConf.value.value)
- FileInputFormat.setInputPaths(newJobConf, path)
- HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
- return newJobConf
- }
- }
-}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -83,11 +48,24 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
- * A base class that provides core functionality for reading data partitions stored in Hadoop.
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
+ * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
+ * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
+ * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobCOnf that HadoopRDD
+ * creates.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
@@ -105,6 +83,7 @@ class HadoopRDD[K, V](
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
valueClass,
@@ -130,6 +109,7 @@ class HadoopRDD[K, V](
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
+ initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}