aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-08-06 15:47:49 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-08-15 16:50:37 -0700
commit8f979edef5b80967b81323e13dcafd5aac92feb1 (patch)
tree07375f141f62ab293479b80b21397165174c329d /core
parent14b6bcdf93642624c42fa04aeaff9fff97f6e07f (diff)
downloadspark-8f979edef5b80967b81323e13dcafd5aac92feb1.tar.gz
spark-8f979edef5b80967b81323e13dcafd5aac92feb1.tar.bz2
spark-8f979edef5b80967b81323e13dcafd5aac92feb1.zip
Fix newTaskAttemptID to work under YARN
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala20
1 files changed, 19 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
index bea6253677..93180307fa 100644
--- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
+import java.lang.{Integer => JInteger, Boolean => JBoolean}
trait SparkHadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
@@ -37,7 +38,24 @@ trait SparkHadoopMapReduceUtil {
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
- new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
+ try {
+ // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
+ JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ } catch {
+ case exc: NoSuchMethodException => {
+ // failed, look for the new ctor that takes a TaskType (not available in 1.x)
+ val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
+ val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
+ JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ }
+ }
}
private def firstAvailableClass(first: String, second: String): Class[_] = {