aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-06 21:42:42 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 21:42:42 -0700
commit672f467668da1cf20895ee57652489c306120288 (patch)
treeefaf5d6b098df53580fbde9745c81a89aa823e58 /core
parentfe12277b40082585e40e1bdf6aa2ebcfe80ed83f (diff)
downloadspark-672f467668da1cf20895ee57652489c306120288.tar.gz
spark-672f467668da1cf20895ee57652489c306120288.tar.bz2
spark-672f467668da1cf20895ee57652489c306120288.zip
[SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection
Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 has already resolved the compatibility issue to support it. But `SparkHadoopMapRedUtil.commitTask` broke it recently. This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix the compatibility issue. Author: zsxwing <zsxwing@gmail.com> Closes #6599 from zsxwing/SPARK-8057 and squashes the following commits: f7a343c [zsxwing] Remove the redundant import 6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala3
2 files changed, 16 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e06b06e06f..7e9dba42be 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
+import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.annotation.DeveloperApi
@@ -195,6 +197,18 @@ class SparkHadoopUtil extends Logging {
}
/**
+ * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly
+ * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes
+ * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+
+ * while it's interface in Hadoop 2.+.
+ */
+ def getTaskAttemptIDFromTaskAttemptContext(
+ context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+ val method = context.getClass.getMethod("getTaskAttemptID")
+ method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
+ }
+
+ /**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 87df42748b..f405b732e4 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.util.{Utils => SparkUtils}
@@ -93,7 +94,7 @@ object SparkHadoopMapRedUtil extends Logging {
splitId: Int,
attemptId: Int): Unit = {
- val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
+ val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
// Called after we have decided to commit
def performCommit(): Unit = {