From 6798a09df84fb97e196c84d55cf3e21ad676871f Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 7 Apr 2013 17:47:38 +0530 Subject: Add support for building against hadoop2-yarn : adding new maven profile for it --- bagel/pom.xml | 37 +++++++++++ core/pom.xml | 62 +++++++++++++++++++ .../apache/hadoop/mapred/HadoopMapRedUtil.scala | 3 + .../hadoop/mapreduce/HadoopMapReduceUtil.scala | 3 + .../apache/hadoop/mapred/HadoopMapRedUtil.scala | 13 ++++ .../hadoop/mapreduce/HadoopMapReduceUtil.scala | 13 ++++ .../apache/hadoop/mapred/HadoopMapRedUtil.scala | 3 + .../hadoop/mapreduce/HadoopMapReduceUtil.scala | 3 + core/src/main/scala/spark/PairRDDFunctions.scala | 5 +- core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 2 +- examples/pom.xml | 43 +++++++++++++ pom.xml | 54 ++++++++++++++++ project/SparkBuild.scala | 34 +++++++++-- repl-bin/pom.xml | 50 +++++++++++++++ repl/pom.xml | 71 ++++++++++++++++++++++ streaming/pom.xml | 37 +++++++++++ 16 files changed, 424 insertions(+), 9 deletions(-) create mode 100644 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala create mode 100644 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala diff --git a/bagel/pom.xml b/bagel/pom.xml index 510cff4669..89282161ea 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -102,5 +102,42 @@ + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/core/pom.xml b/core/pom.xml index fe9c803728..9baa447662 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -279,5 +279,67 @@ + + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + src/main/scala + src/hadoop2-yarn/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index ca9f7219de..f286f2cf9c 100644 --- a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -4,4 +4,7 @@ trait HadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index de7b0f81e3..264d421d14 100644 --- a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -6,4 +6,7 @@ trait HadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala new file mode 100644 index 0000000000..875c0a220b --- /dev/null +++ b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -0,0 +1,13 @@ + +package org.apache.hadoop.mapred + +import org.apache.hadoop.mapreduce.TaskType + +trait HadoopMapRedUtil { + def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = + new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) +} diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala new file mode 100644 index 0000000000..8bc6fb6dea --- /dev/null +++ b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -0,0 +1,13 @@ +package org.apache.hadoop.mapreduce + +import org.apache.hadoop.conf.Configuration +import task.{TaskAttemptContextImpl, JobContextImpl} + +trait HadoopMapReduceUtil { + def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) + + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = + new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) +} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index 35300cea58..a0652d7fc7 100644 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -4,4 +4,7 @@ trait HadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index 7afdbff320..7fdbe322fd 100644 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -7,4 +7,7 @@ trait HadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, + jobId, isMap, taskId, attemptId) } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 07efba9e8d..39469fa3c8 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -545,8 +545,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" */ - val attemptId = new TaskAttemptID(jobtrackerID, - stageId, false, context.splitId, attemptNumber) + val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outputFormatClass.newInstance val committer = format.getOutputCommitter(hadoopContext) @@ -565,7 +564,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * however we're only going to use this local OutputCommitter for * setupJob/commitJob, so we just use a dummy "map" task. */ - val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0) + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index bdd974590a..901d01ef30 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -57,7 +57,7 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] val conf = confBroadcast.value.value - val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) + val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance if (format.isInstanceOf[Configurable]) { diff --git a/examples/pom.xml b/examples/pom.xml index 39cc47c709..9594257ad4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -118,5 +118,48 @@ + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-streaming + ${project.version} + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/pom.xml b/pom.xml index 08d1fc12e0..b3134a957d 100644 --- a/pom.xml +++ b/pom.xml @@ -558,5 +558,59 @@ + + + hadoop2-yarn + + 2 + 2.0.3-alpha + + + + + maven-root + Maven root repository + http://repo1.maven.org/maven2/ + + true + + + false + + + + + + + + + org.apache.hadoop + hadoop-client + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-common + ${yarn.version} + + + + org.apache.avro + avro + 1.7.1.cloudera.2 + + + org.apache.avro + avro-ipc + 1.7.1.cloudera.2 + + + + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5f378b2398..f041930b4e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1,3 +1,4 @@ + import sbt._ import sbt.Classpaths.publishTask import Keys._ @@ -10,12 +11,18 @@ import twirl.sbt.TwirlPlugin._ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - val HADOOP_VERSION = "1.0.4" - val HADOOP_MAJOR_VERSION = "1" + //val HADOOP_VERSION = "1.0.4" + //val HADOOP_MAJOR_VERSION = "1" // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" //val HADOOP_MAJOR_VERSION = "2" + //val HADOOP_YARN = false + + // For Hadoop 2 YARN support + val HADOOP_VERSION = "2.0.3-alpha" + val HADOOP_MAJOR_VERSION = "2" + val HADOOP_YARN = true lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming) @@ -129,7 +136,6 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-api" % slf4jVersion, "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.ning" % "compress-lzf" % "0.8.4", - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", @@ -142,8 +148,26 @@ object SparkBuild extends Build { "cc.spray" % "spray-server" % "1.0-M2.1", "cc.spray" %% "spray-json" % "1.1.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" - ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, - unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } + ) ++ ( + if (HADOOP_MAJOR_VERSION == "2") { + if (HADOOP_YARN) { + Seq( + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION + ) + } else { + Seq( + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION + ) + } + } else { + Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION) + }), + unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / + ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") "src/hadoop2-yarn/scala" else "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" ) + } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings def rootSettings = sharedSettings ++ Seq( diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index dd720e2291..f9d84fd3c4 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -153,6 +153,56 @@ + + hadoop2-yarn + + hadoop2-yarn + + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-bagel + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-examples + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-repl + ${project.version} + hadoop2-yarn + runtime + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + deb diff --git a/repl/pom.xml b/repl/pom.xml index a3e4606edc..1f885673f4 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -187,5 +187,76 @@ + + hadoop2-yarn + + hadoop2-yarn + + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-bagel + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-examples + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-streaming + ${project.version} + hadoop2-yarn + runtime + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + org.apache.avro + avro + provided + + + org.apache.avro + avro-ipc + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + diff --git a/streaming/pom.xml b/streaming/pom.xml index ec077e8089..fc2e211a42 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -149,5 +149,42 @@ + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + -- cgit v1.2.3