From a6eeb5ffd54956667ec4e793149fdab90041ad6c Mon Sep 17 00:00:00 2001 From: Harvey Date: Sun, 22 Sep 2013 03:05:02 -0700 Subject: Add a cache for HadoopRDD metadata needed during computation. Currently, the cache is in SparkHadoopUtils, since it's conveniently a member of the SparkEnv. --- .../main/scala/org/apache/spark/CacheManager.scala | 4 +- .../main/scala/org/apache/spark/SparkContext.scala | 29 +++++--- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 6 ++ .../scala/org/apache/spark/rdd/HadoopRDD.scala | 77 ++++++++++++++-------- 4 files changed, 79 insertions(+), 37 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 68b99ca125..3d36761cda 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD sure a node doesn't load two copies of an RDD at once. */ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { - private val loading = new HashSet[String] + + /** Keys of RDD splits that are being computed/loaded. */ + private val loading = new HashSet[String]() /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 894cc67acf..47fe743880 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -342,16 +343,26 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits - ) : RDD[(K, V)] = { - val broadcastHadoopConfiguration = broadcast(new SerializableWritable(hadoopConfiguration)) + ): RDD[(K, V)] = { + val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) + hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** + * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration + * that has already been broadcast, assuming that it's safe to use it to construct a + * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued). + */ + def hadoopFile[K, V]( + path: String, + confBroadcast: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int + ): RDD[(K, V)] = { new HadoopFileRDD( - this, - path, - broadcastHadoopConfiguration, - inputFormatClass, - keyClass, - valueClass, - minSplits) + this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 0a5f4c368f..f416b95afb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -16,6 +16,9 @@ */ package org.apache.spark.deploy + +import com.google.common.collect.MapMaker + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -24,6 +27,9 @@ import org.apache.hadoop.mapred.JobConf * Contains util methods to interact with Hadoop from spark. */ class SparkHadoopUtil { + // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses + // this to cache JobConfs). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index e259ef52a9..1ae8e41162 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.conf.{Configuration, Configurable} * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file * system, or S3). * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same - * across multiple reads; the 'path' is the only variable that is different acrodd new JobConfs + * across multiple reads; the 'path' is the only variable that is different across new JobConfs * created from the Configuration. */ class HadoopFileRDD[K, V]( @@ -50,13 +50,18 @@ class HadoopFileRDD[K, V]( minSplits: Int) extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) { - private val localJobConf: JobConf = { - val jobConf = new JobConf(hadoopConfBroadcast.value.value) - FileInputFormat.setInputPaths(jobConf, path) - jobConf - } + private val jobConfCacheKey = "rdd_%d_job_conf".format(id) - override def getJobConf: JobConf = localJobConf + override def getJobConf(): JobConf = { + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + val newJobConf = new JobConf(hadoopConfBroadcast.value.value) + FileInputFormat.setInputPaths(newJobConf, path) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } } /** @@ -71,10 +76,13 @@ class HadoopDatasetRDD[K, V]( minSplits: Int) extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) { + // Add necessary security credentials to the JobConf before broadcasting it. + SparkEnv.get.hadoop.addCredentials(conf) + // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it. private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - override def getJobConf: JobConf = confBroadcast.value.value + override def getJobConf(): JobConf = confBroadcast.value.value } /** @@ -101,20 +109,31 @@ abstract class HadoopRDD[K, V]( minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { - // The JobConf used to obtain input splits for Hadoop reads. The subclass is responsible for - // determining how the JobConf is initialized. - protected def getJobConf: JobConf + private val inputFormatCacheKey = "rdd_%d_input_format".format(id) + + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. + protected def getJobConf(): JobConf - def getConf: Configuration = getJobConf + def getInputFormat(conf: JobConf): InputFormat[K, V] = { + if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { + return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] + } + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[K, V]] + if (newInputFormat.isInstanceOf[Configurable]) { + newInputFormat.asInstanceOf[Configurable].setConf(conf) + } + HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) + return newInputFormat + } override def getPartitions: Array[Partition] = { - val env = SparkEnv.get - env.hadoop.addCredentials(getJobConf) - val inputFormat = createInputFormat(getJobConf) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(getJobConf) + inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(getJobConf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minSplits) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) @@ -122,21 +141,14 @@ abstract class HadoopRDD[K, V]( array } - def createInputFormat(conf: JobConf): InputFormat[K, V] = { - ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] - } - override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null - val fmt = createInputFormat(getJobConf) - if (fmt.isInstanceOf[Configurable]) { - fmt.asInstanceOf[Configurable].setConf(getJobConf) - } - reader = fmt.getRecordReader(split.inputSplit.value, getJobConf, Reporter.NULL) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } @@ -172,4 +184,15 @@ abstract class HadoopRDD[K, V]( override def checkpoint() { // Do nothing. Hadoop RDD should not be checkpointed. } + + def getConf: Configuration = getJobConf() +} + +object HadoopRDD { + def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + + def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + + def putCachedMetadata(key: String, value: Any) = + SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) } -- cgit v1.2.3