aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-07-17 17:33:38 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-08-15 16:50:37 -0700
commit69c3bbf688cdd21171413d415cfc6d6cb8e77bd5 (patch)
tree27f79044537673218264df8d983c710b7bdb2117
parentf67b94ad4fc8c9e7a71dd7f65d617743947ae91c (diff)
downloadspark-69c3bbf688cdd21171413d415cfc6d6cb8e77bd5.tar.gz
spark-69c3bbf688cdd21171413d415cfc6d6cb8e77bd5.tar.bz2
spark-69c3bbf688cdd21171413d415cfc6d6cb8e77bd5.zip
dynamically detect hadoop version
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala26
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala30
-rw-r--r--project/SparkBuild.scala35
3 files changed, 51 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
index 25386b2796..6cfafd3760 100644
--- a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -18,10 +18,28 @@
package org.apache.hadoop.mapred
trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
+ def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+ def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
index b1002e0cac..0f77828dc8 100644
--- a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -20,10 +20,32 @@ package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
+ def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.JobContextImpl",
+ "org.apache.hadoop.mapreduce.JobContext")
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+ def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
+ "org.apache.hadoop.mapreduce.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f6519c8287..a06550bb97 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -27,13 +27,8 @@ object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
val HADOOP_VERSION = "1.0.4"
- val HADOOP_MAJOR_VERSION = "1"
- val HADOOP_YARN = false
-
- // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
//val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
- //val HADOOP_MAJOR_VERSION = "2"
- //val HADOOP_YARN = false
+ val HADOOP_YARN = false
// For Hadoop 2 YARN support
//val HADOOP_VERSION = "2.0.2-alpha"
@@ -184,37 +179,13 @@ object SparkBuild extends Build {
"org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
+ "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
- ) ++ (
- if (HADOOP_MAJOR_VERSION == "2") {
- if (HADOOP_YARN) {
- Seq(
- // Exclude rule required for all ?
- "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
- )
- } else {
- Seq(
- "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
- )
- }
- } else {
- Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) )
- }),
- unmanagedSourceDirectories in Compile <+= baseDirectory{ _ /
- ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") {
- "src/hadoop2-yarn/scala"
- } else {
- "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
- } )
- }
+ )
) ++ assemblySettings ++ extraAssemblySettings
def rootSettings = sharedSettings ++ Seq(