From 8f979edef5b80967b81323e13dcafd5aac92feb1 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Tue, 6 Aug 2013 15:47:49 -0700 Subject: Fix newTaskAttemptID to work under YARN --- .../hadoop/mapreduce/SparkHadoopMapReduceUtil.scala | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'core/src/main/scala/org/apache/hadoop') diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala index bea6253677..93180307fa 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce import org.apache.hadoop.conf.Configuration +import java.lang.{Integer => JInteger, Boolean => JBoolean} trait SparkHadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { @@ -37,7 +38,24 @@ trait SparkHadoopMapReduceUtil { } def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { - new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID"); + try { + // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN) + val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean], + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new + JInteger(attemptId)).asInstanceOf[TaskAttemptID] + } catch { + case exc: NoSuchMethodException => { + // failed, look for the new ctor that takes a TaskType (not available in 1.x) + val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]] + val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE") + val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass, + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new + JInteger(attemptId)).asInstanceOf[TaskAttemptID] + } + } } private def firstAvailableClass(first: String, second: String): Class[_] = { -- cgit v1.2.3