aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/NewHadoopRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala19
1 files changed, 8 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 197ed5ea17..c3b155fcbd 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -20,11 +20,12 @@ class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit
}
class NewHadoopRDD[K, V](
- sc: SparkContext,
+ sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K], valueClass: Class[V],
+ keyClass: Class[K],
+ valueClass: Class[V],
@transient conf: Configuration)
- extends RDD[(K, V)](sc)
+ extends RDD[(K, V)](sc, Nil)
with HadoopMapReduceUtil {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
@@ -36,11 +37,9 @@ class NewHadoopRDD[K, V](
formatter.format(new Date())
}
- @transient
- private val jobId = new JobID(jobtrackerId, id)
+ @transient private val jobId = new JobID(jobtrackerId, id)
- @transient
- private val splits_ : Array[Split] = {
+ @transient private val splits_ : Array[Split] = {
val inputFormat = inputFormatClass.newInstance
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -51,7 +50,7 @@ class NewHadoopRDD[K, V](
result
}
- override def splits = splits_
+ override def getSplits = splits_
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
@@ -86,10 +85,8 @@ class NewHadoopRDD[K, V](
}
}
- override def preferredLocations(split: Split) = {
+ override def getPreferredLocations(split: Split) = {
val theSplit = split.asInstanceOf[NewHadoopSplit]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
}
-
- override val dependencies: List[Dependency[_]] = Nil
}