aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-07-17 17:33:38 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-08-15 16:50:37 -0700
commit69c3bbf688cdd21171413d415cfc6d6cb8e77bd5 (patch)
tree27f79044537673218264df8d983c710b7bdb2117 /core
parentf67b94ad4fc8c9e7a71dd7f65d617743947ae91c (diff)
downloadspark-69c3bbf688cdd21171413d415cfc6d6cb8e77bd5.tar.gz
spark-69c3bbf688cdd21171413d415cfc6d6cb8e77bd5.tar.bz2
spark-69c3bbf688cdd21171413d415cfc6d6cb8e77bd5.zip
dynamically detect hadoop version
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala26
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala30
2 files changed, 48 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
index 25386b2796..6cfafd3760 100644
--- a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -18,10 +18,28 @@
package org.apache.hadoop.mapred
trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
+ def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+ def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
index b1002e0cac..0f77828dc8 100644
--- a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -20,10 +20,32 @@ package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
+ def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.JobContextImpl",
+ "org.apache.hadoop.mapreduce.JobContext")
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+ def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
+ "org.apache.hadoop.mapreduce.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
}