aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/NewHadoopRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala15
1 files changed, 5 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index dcbceab246..7a1a0fb87d 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -2,13 +2,7 @@ package spark.rdd
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.InputFormat
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.JobContext
-import org.apache.hadoop.mapreduce.JobID
-import org.apache.hadoop.mapreduce.RecordReader
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.hadoop.mapreduce.TaskAttemptID
+import org.apache.hadoop.mapreduce._
import java.util.Date
import java.text.SimpleDateFormat
@@ -33,7 +27,8 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], valueClass: Class[V],
@transient conf: Configuration)
- extends RDD[(K, V)](sc) {
+ extends RDD[(K, V)](sc)
+ with HadoopMapReduceUtil {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -50,7 +45,7 @@ class NewHadoopRDD[K, V](
@transient
private val splits_ : Array[Split] = {
val inputFormat = inputFormatClass.newInstance
- val jobContext = new JobContext(conf, jobId)
+ val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Split](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -65,7 +60,7 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = confBroadcast.value.value
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
- val context = new TaskAttemptContext(conf, attemptId)
+ val context = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
reader.initialize(split.serializableHadoopSplit.value, context)