aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHarvey <h.feng@berkeley.edu>2013-09-22 03:05:02 -0700
committerHarvey <h.feng@berkeley.edu>2013-09-22 03:09:17 -0700
commita6eeb5ffd54956667ec4e793149fdab90041ad6c (patch)
treedf5d2a0aa987a06f5b98e3277dff5f1179335233 /core
parentbe0fc7246f31160cc9ea441135630a84dcd65da1 (diff)
downloadspark-a6eeb5ffd54956667ec4e793149fdab90041ad6c.tar.gz
spark-a6eeb5ffd54956667ec4e793149fdab90041ad6c.tar.bz2
spark-a6eeb5ffd54956667ec4e793149fdab90041ad6c.zip
Add a cache for HadoopRDD metadata needed during computation.
Currently, the cache is in SparkHadoopUtils, since it's conveniently a member of the SparkEnv.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala77
4 files changed, 79 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca125..3d36761cda 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
- private val loading = new HashSet[String]
+
+ /** Keys of RDD splits that are being computed/loaded. */
+ private val loading = new HashSet[String]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 894cc67acf..47fe743880 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@@ -342,16 +343,26 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
- ) : RDD[(K, V)] = {
- val broadcastHadoopConfiguration = broadcast(new SerializableWritable(hadoopConfiguration))
+ ): RDD[(K, V)] = {
+ val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+ hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ }
+
+ /**
+ * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+ * that has already been broadcast, assuming that it's safe to use it to construct a
+ * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
+ */
+ def hadoopFile[K, V](
+ path: String,
+ confBroadcast: Broadcast[SerializableWritable[Configuration]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int
+ ): RDD[(K, V)] = {
new HadoopFileRDD(
- this,
- path,
- broadcastHadoopConfiguration,
- inputFormatClass,
- keyClass,
- valueClass,
- minSplits)
+ this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c368f..f416b95afb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
*/
package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
@@ -24,6 +27,9 @@ import org.apache.hadoop.mapred.JobConf
* Contains util methods to interact with Hadoop from spark.
*/
class SparkHadoopUtil {
+ // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses
+ // this to cache JobConfs).
+ private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e259ef52a9..1ae8e41162 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -37,7 +37,7 @@ import org.apache.hadoop.conf.{Configuration, Configurable}
* An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
* system, or S3).
* This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
- * across multiple reads; the 'path' is the only variable that is different acrodd new JobConfs
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
* created from the Configuration.
*/
class HadoopFileRDD[K, V](
@@ -50,13 +50,18 @@ class HadoopFileRDD[K, V](
minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
- private val localJobConf: JobConf = {
- val jobConf = new JobConf(hadoopConfBroadcast.value.value)
- FileInputFormat.setInputPaths(jobConf, path)
- jobConf
- }
+ private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
- override def getJobConf: JobConf = localJobConf
+ override def getJobConf(): JobConf = {
+ if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ val newJobConf = new JobConf(hadoopConfBroadcast.value.value)
+ FileInputFormat.setInputPaths(newJobConf, path)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
}
/**
@@ -71,10 +76,13 @@ class HadoopDatasetRDD[K, V](
minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
+ // Add necessary security credentials to the JobConf before broadcasting it.
+ SparkEnv.get.hadoop.addCredentials(conf)
+
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
- override def getJobConf: JobConf = confBroadcast.value.value
+ override def getJobConf(): JobConf = confBroadcast.value.value
}
/**
@@ -101,20 +109,31 @@ abstract class HadoopRDD[K, V](
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
- // The JobConf used to obtain input splits for Hadoop reads. The subclass is responsible for
- // determining how the JobConf is initialized.
- protected def getJobConf: JobConf
+ private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+ // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+ protected def getJobConf(): JobConf
- def getConf: Configuration = getJobConf
+ def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+ if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+ return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+ }
+ val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[K, V]]
+ if (newInputFormat.isInstanceOf[Configurable]) {
+ newInputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
+ HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+ return newInputFormat
+ }
override def getPartitions: Array[Partition] = {
- val env = SparkEnv.get
- env.hadoop.addCredentials(getJobConf)
- val inputFormat = createInputFormat(getJobConf)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
- inputFormat.asInstanceOf[Configurable].setConf(getJobConf)
+ inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
- val inputSplits = inputFormat.getSplits(getJobConf, minSplits)
+ val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -122,21 +141,14 @@ abstract class HadoopRDD[K, V](
array
}
- def createInputFormat(conf: JobConf): InputFormat[K, V] = {
- ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
- .asInstanceOf[InputFormat[K, V]]
- }
-
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
- val fmt = createInputFormat(getJobConf)
- if (fmt.isInstanceOf[Configurable]) {
- fmt.asInstanceOf[Configurable].setConf(getJobConf)
- }
- reader = fmt.getRecordReader(split.inputSplit.value, getJobConf, Reporter.NULL)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
+ reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -172,4 +184,15 @@ abstract class HadoopRDD[K, V](
override def checkpoint() {
// Do nothing. Hadoop RDD should not be checkpointed.
}
+
+ def getConf: Configuration = getJobConf()
+}
+
+object HadoopRDD {
+ def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+ def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+ def putCachedMetadata(key: String, value: Any) =
+ SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
}