diff options
8 files changed, 53 insertions, 22 deletions
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala new file mode 100644 index 0000000000..ca9f7219de --- /dev/null +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -0,0 +1,7 @@ +package org.apache.hadoop.mapred + +trait HadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) +} diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala new file mode 100644 index 0000000000..de7b0f81e3 --- /dev/null +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -0,0 +1,9 @@ +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration + +trait HadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) +} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala new file mode 100644 index 0000000000..35300cea58 --- /dev/null +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -0,0 +1,7 @@ +package org.apache.hadoop.mapred + +trait HadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) +} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala new file mode 100644 index 0000000000..7afdbff320 --- /dev/null +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -0,0 +1,10 @@ +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration +import task.{TaskAttemptContextImpl, JobContextImpl} + +trait HadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) +} diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index ffe0f3c4a1..afcf9f6db4 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -23,7 +23,7 @@ import spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable { +class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -129,14 +129,14 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl private def getJobContext(): JobContext = { if (jobContext == null) { - jobContext = new JobContext(conf.value, jID.value) + jobContext = newJobContext(conf.value, jID.value) } return jobContext } private def getTaskContext(): TaskAttemptContext = { if (taskContext == null) { - taskContext = new TaskAttemptContext(conf.value, taID.value) + taskContext = newTaskAttemptContext(conf.value, taID.value) } return taskContext } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 0240fd95c7..d693b4e820 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -28,11 +28,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} -import org.apache.hadoop.mapreduce.TaskAttemptID -import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext} import spark.partial.BoundedDouble import spark.partial.PartialResult @@ -46,6 +42,7 @@ import spark.SparkContext._ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging + with HadoopMapReduceUtil with Serializable { /** @@ -506,7 +503,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /* "reduce task" <split #> <attempt # = spark task #> */ val attemptId = new TaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber) - val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId) + val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outputFormatClass.newInstance val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) @@ -525,7 +522,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * setupJob/commitJob, so we just use a dummy "map" task. */ val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0) - val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) val count = self.context.runJob(self, writeShard _).sum diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index dcbceab246..7a1a0fb87d 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -2,13 +2,7 @@ package spark.rdd import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce.InputFormat -import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.JobContext -import org.apache.hadoop.mapreduce.JobID -import org.apache.hadoop.mapreduce.RecordReader -import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.mapreduce.TaskAttemptID +import org.apache.hadoop.mapreduce._ import java.util.Date import java.text.SimpleDateFormat @@ -33,7 +27,8 @@ class NewHadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient conf: Configuration) - extends RDD[(K, V)](sc) { + extends RDD[(K, V)](sc) + with HadoopMapReduceUtil { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it val confBroadcast = sc.broadcast(new SerializableWritable(conf)) @@ -50,7 +45,7 @@ class NewHadoopRDD[K, V]( @transient private val splits_ : Array[Split] = { val inputFormat = inputFormatClass.newInstance - val jobContext = new JobContext(conf, jobId) + val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Split](rawSplits.size) for (i <- 0 until rawSplits.size) { @@ -65,7 +60,7 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopSplit] val conf = confBroadcast.value.value val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) - val context = new TaskAttemptContext(conf, attemptId) + val context = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance val reader = format.createRecordReader(split.serializableHadoopSplit.value, context) reader.initialize(split.serializableHadoopSplit.value, context) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c9cf17d90a..e165ba3ac1 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -11,6 +11,11 @@ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.3" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. val HADOOP_VERSION = "0.20.205.0" + val HADOOP_MAJOR_VERSION = "1" + + // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" + //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" + //val HADOOP_MAJOR_VERSION = "2" lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel) @@ -108,7 +113,7 @@ object SparkBuild extends Build { "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", - "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" ), libraryDependencies ++= Seq( @@ -129,7 +134,8 @@ object SparkBuild extends Build { "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" - ) + ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, + unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings def rootSettings = sharedSettings ++ Seq( |