aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHarvey Feng <harvey@databricks.com>2013-09-28 18:32:41 -0700
committerHarvey Feng <harvey@databricks.com>2013-09-29 20:08:03 -0700
commit7d06bdde1d1364dcbef67079b23f6e9777a2de2e (patch)
tree44a3be5f8f2b152323f8057abda883f52541be7a /core
parent417085716ad0b451e4ab54cd3391a7ec2e1e3c5e (diff)
downloadspark-7d06bdde1d1364dcbef67079b23f6e9777a2de2e.tar.gz
spark-7d06bdde1d1364dcbef67079b23f6e9777a2de2e.tar.bz2
spark-7d06bdde1d1364dcbef67079b23f6e9777a2de2e.zip
Merge HadoopDatasetRDD into HadoopRDD.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala58
2 files changed, 40 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 11e92945ec..ada1037bd6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -332,17 +332,15 @@ class SparkContext(
* etc).
*/
def hadoopRDD[K, V](
- conf: JobConf,
+ jobConf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
- SparkEnv.get.hadoop.addCredentials(conf)
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
- val confBroadcast = broadcast(new SerializableWritable(conf))
- new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ SparkEnv.get.hadoop.addCredentials(jobConf)
+ new HadoopRDD(this, jobConf, inputFormatClass, keyClass, valueClass, minSplits)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
@@ -353,6 +351,7 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
+ // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index dd9fc7b79a..404532dad4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -43,20 +43,18 @@ import org.apache.hadoop.conf.{Configuration, Configurable}
class HadoopFileRDD[K, V](
sc: SparkContext,
path: String,
- hadoopConfBroadcast: Broadcast[SerializableWritable[Configuration]],
+ confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
- extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
-
- private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+ extends HadoopRDD[K, V](sc, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) {
override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
- val newJobConf = new JobConf(hadoopConfBroadcast.value.value)
+ val newJobConf = new JobConf(confBroadcast.value.value)
FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
@@ -65,21 +63,6 @@ class HadoopFileRDD[K, V](
}
/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. tables in HBase).
- */
-class HadoopDatasetRDD[K, V](
- sc: SparkContext,
- confBroadcast: Broadcast[SerializableWritable[JobConf]],
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int)
- extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
-
- override def getJobConf(): JobConf = confBroadcast.value.value
-}
-
-/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
@@ -95,18 +78,49 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
/**
* A base class that provides core functionality for reading data partitions stored in Hadoop.
*/
-abstract class HadoopRDD[K, V](
+class HadoopRDD[K, V](
sc: SparkContext,
+ confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
+ def this(
+ sc: SparkContext,
+ jobConf: JobConf,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableWritable(jobConf))
+ .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minSplits)
+ }
+
+ protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
- protected def getJobConf(): JobConf
+ protected def getJobConf(): JobConf = {
+ val conf: Configuration = confBroadcast.value.value
+ if (conf.isInstanceOf[JobConf]) {
+ return conf.asInstanceOf[JobConf]
+ } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ val newJobConf = new JobConf(confBroadcast.value.value)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
def getInputFormat(conf: JobConf): InputFormat[K, V] = {
if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {