aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHarvey Feng <harvey@databricks.com>2013-09-26 15:49:42 -0700
committerHarvey Feng <harvey@databricks.com>2013-09-26 15:49:42 -0700
commit417085716ad0b451e4ab54cd3391a7ec2e1e3c5e (patch)
tree1c6463a0f210eb0e36f0480247ff7c34a25a65d3 /core
parent714fdabd99bbff3a0cdec5dcf06b021a3a3f2da8 (diff)
parentef34cfb26cffefac4c39c5a6006b836f1e325d09 (diff)
downloadspark-417085716ad0b451e4ab54cd3391a7ec2e1e3c5e.tar.gz
spark-417085716ad0b451e4ab54cd3391a7ec2e1e3c5e.tar.bz2
spark-417085716ad0b451e4ab54cd3391a7ec2e1e3c5e.zip
Merge remote-tracking branch 'oldsparkme/hadoopRDD-broadcast-change' into hadoop-config-cache
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala109
4 files changed, 120 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca125..3d36761cda 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
- private val loading = new HashSet[String]
+
+ /** Keys of RDD splits that are being computed/loaded. */
+ private val loading = new HashSet[String]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 912ce752fb..11e92945ec 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@@ -337,7 +338,11 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ // Add necessary security credentials to the JobConf before broadcasting it.
+ SparkEnv.get.hadoop.addCredentials(conf)
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
+ val confBroadcast = broadcast(new SerializableWritable(conf))
+ new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
@@ -347,10 +352,26 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
- ) : RDD[(K, V)] = {
- val conf = new JobConf(hadoopConfiguration)
- FileInputFormat.setInputPaths(conf, path)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ ): RDD[(K, V)] = {
+ val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+ hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ }
+
+ /**
+ * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+ * that has already been broadcast, assuming that it's safe to use it to construct a
+ * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
+ */
+ def hadoopFile[K, V](
+ path: String,
+ confBroadcast: Broadcast[SerializableWritable[Configuration]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int
+ ): RDD[(K, V)] = {
+ new HadoopFileRDD(
+ this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c368f..f416b95afb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
*/
package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
@@ -24,6 +27,9 @@ import org.apache.hadoop.mapred.JobConf
* Contains util methods to interact with Hadoop from spark.
*/
class SparkHadoopUtil {
+ // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses
+ // this to cache JobConfs).
+ private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734e41..dd9fc7b79a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import java.io.EOFException
+import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,57 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+ TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+ sc: SparkContext,
+ path: String,
+ hadoopConfBroadcast: Broadcast[SerializableWritable[Configuration]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int)
+ extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
+
+ private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+ override def getJobConf(): JobConf = {
+ if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ val newJobConf = new JobConf(hadoopConfBroadcast.value.value)
+ FileInputFormat.setInputPaths(newJobConf, path)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
+}
+
+/**
+ * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. tables in HBase).
+ */
+class HadoopDatasetRDD[K, V](
+ sc: SparkContext,
+ confBroadcast: Broadcast[SerializableWritable[JobConf]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int)
+ extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
+
+ override def getJobConf(): JobConf = confBroadcast.value.value
+}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +93,41 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions stored in Hadoop.
*/
-class HadoopRDD[K, V](
+abstract class HadoopRDD[K, V](
sc: SparkContext,
- @transient conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+ // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+ protected def getJobConf(): JobConf
+
+ def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+ if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+ return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+ }
+ val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[K, V]]
+ if (newInputFormat.isInstanceOf[Configurable]) {
+ newInputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
+ HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+ return newInputFormat
+ }
override def getPartitions: Array[Partition] = {
- val env = SparkEnv.get
- env.hadoop.addCredentials(conf)
- val inputFormat = createInputFormat(conf)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
- inputFormat.asInstanceOf[Configurable].setConf(conf)
+ inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
- val inputSplits = inputFormat.getSplits(conf, minSplits)
+ val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +135,14 @@ class HadoopRDD[K, V](
array
}
- def createInputFormat(conf: JobConf): InputFormat[K, V] = {
- ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
- .asInstanceOf[InputFormat[K, V]]
- }
-
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
- val conf = confBroadcast.value.value
- val fmt = createInputFormat(conf)
- if (fmt.isInstanceOf[Configurable]) {
- fmt.asInstanceOf[Configurable].setConf(conf)
- }
- reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
+ reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +179,14 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}
- def getConf: Configuration = confBroadcast.value.value
+ def getConf: Configuration = getJobConf()
+}
+
+object HadoopRDD {
+ def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+ def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+ def putCachedMetadata(key: String, value: Any) =
+ SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
}