aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-11-09 22:11:20 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-09 22:11:20 -0800
commitf8e5732307dcb1482d9bcf1162a1090ef9a7b913 (patch)
treeff4bdd9ef7f96d458b4aafff8c6a7f18309611b3 /core/src
parentf73b56f5e5d94f83d980475d3f39548986a92dd6 (diff)
downloadspark-f8e5732307dcb1482d9bcf1162a1090ef9a7b913.tar.gz
spark-f8e5732307dcb1482d9bcf1162a1090ef9a7b913.tar.bz2
spark-f8e5732307dcb1482d9bcf1162a1090ef9a7b913.zip
SPARK-1209 [CORE] (Take 2) SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
andrewor14 Another try at SPARK-1209, to address https://github.com/apache/spark/pull/2814#issuecomment-61197619 I successfully tested with `mvn -Dhadoop.version=1.0.4 -DskipTests clean package; mvn -Dhadoop.version=1.0.4 test` I assume that is what failed Jenkins last time. I also tried `-Dhadoop.version1.2.1` and `-Phadoop-2.4 -Pyarn -Phive` for more coverage. So this is why the class was put in `org.apache.hadoop` to begin with, I assume. One option is to leave this as-is for now and move it only when Hadoop 1.0.x support goes away. This is the other option, which adds a call to force the constructor to be public at run-time. It's probably less surprising than putting Spark code in `org.apache.hadoop`, but, does involve reflection. A `SecurityManager` might forbid this, but it would forbid a lot of stuff Spark does. This would also only affect Hadoop 1.0.x it seems. Author: Sean Owen <sowen@cloudera.com> Closes #3048 from srowen/SPARK-1209 and squashes the following commits: 0d48f4b [Sean Owen] For Hadoop 1.0.x, make certain constructors public, which were public in later versions 466e179 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though? eb61820 [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala (renamed from core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala)17
-rw-r--r--core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala (renamed from core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala)5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala3
5 files changed, 22 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 376e69cd99..4023759657 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
/**
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 0c47afae54..21b782edd2 100644
--- a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -15,15 +15,24 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred
+package org.apache.spark.mapred
-private[apache]
+import java.lang.reflect.Modifier
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}
+
+private[spark]
trait SparkHadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
"org.apache.hadoop.mapred.JobContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
classOf[org.apache.hadoop.mapreduce.JobID])
+ // In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
+ // Make it accessible if it's not in order to access it.
+ if (!Modifier.isPublic(ctor.getModifiers)) {
+ ctor.setAccessible(true)
+ }
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}
@@ -31,6 +40,10 @@ trait SparkHadoopMapRedUtil {
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
"org.apache.hadoop.mapred.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
+ // See above
+ if (!Modifier.isPublic(ctor.getModifiers)) {
+ ctor.setAccessible(true)
+ }
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
index 1fca5729c6..3340673f91 100644
--- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapreduce
+package org.apache.spark.mapreduce
import java.lang.{Boolean => JBoolean, Integer => JInteger}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
-private[apache]
+private[spark]
trait SparkHadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
val klass = firstAvailableClass(
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 351e145f96..e55d03d391 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index da89f634ab..462f0d6268 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -33,13 +33,14 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
-RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
+RecordWriter => NewRecordWriter}
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils