diff options
author | Sean Owen <sowen@cloudera.com> | 2014-11-09 22:11:20 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-11-09 22:11:20 -0800 |
commit | f8e5732307dcb1482d9bcf1162a1090ef9a7b913 (patch) | |
tree | ff4bdd9ef7f96d458b4aafff8c6a7f18309611b3 | |
parent | f73b56f5e5d94f83d980475d3f39548986a92dd6 (diff) | |
download | spark-f8e5732307dcb1482d9bcf1162a1090ef9a7b913.tar.gz spark-f8e5732307dcb1482d9bcf1162a1090ef9a7b913.tar.bz2 spark-f8e5732307dcb1482d9bcf1162a1090ef9a7b913.zip |
SPARK-1209 [CORE] (Take 2) SparkHadoop{MapRed,MapReduce}Util should not use package org.apache.hadoop
andrewor14 Another try at SPARK-1209, to address https://github.com/apache/spark/pull/2814#issuecomment-61197619
I successfully tested with `mvn -Dhadoop.version=1.0.4 -DskipTests clean package; mvn -Dhadoop.version=1.0.4 test` I assume that is what failed Jenkins last time. I also tried `-Dhadoop.version1.2.1` and `-Phadoop-2.4 -Pyarn -Phive` for more coverage.
So this is why the class was put in `org.apache.hadoop` to begin with, I assume. One option is to leave this as-is for now and move it only when Hadoop 1.0.x support goes away.
This is the other option, which adds a call to force the constructor to be public at run-time. It's probably less surprising than putting Spark code in `org.apache.hadoop`, but, does involve reflection. A `SecurityManager` might forbid this, but it would forbid a lot of stuff Spark does. This would also only affect Hadoop 1.0.x it seems.
Author: Sean Owen <sowen@cloudera.com>
Closes #3048 from srowen/SPARK-1209 and squashes the following commits:
0d48f4b [Sean Owen] For Hadoop 1.0.x, make certain constructors public, which were public in later versions
466e179 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though?
eb61820 [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala (renamed from core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala) | 17 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala (renamed from core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala) | 5 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 3 | ||||
-rw-r--r-- | project/MimaExcludes.scala | 8 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala | 1 | ||||
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala | 1 |
8 files changed, 32 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 376e69cd99..4023759657 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path +import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.HadoopRDD /** diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 0c47afae54..21b782edd2 100644 --- a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -15,15 +15,24 @@ * limitations under the License. */ -package org.apache.hadoop.mapred +package org.apache.spark.mapred -private[apache] +import java.lang.reflect.Modifier + +import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext} + +private[spark] trait SparkHadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = { val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext") val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID]) + // In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private. + // Make it accessible if it's not in order to access it. + if (!Modifier.isPublic(ctor.getModifiers)) { + ctor.setAccessible(true) + } ctor.newInstance(conf, jobId).asInstanceOf[JobContext] } @@ -31,6 +40,10 @@ trait SparkHadoopMapRedUtil { val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext") val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) + // See above + if (!Modifier.isPublic(ctor.getModifiers)) { + ctor.setAccessible(true) + } ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala index 1fca5729c6..3340673f91 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala @@ -15,13 +15,14 @@ * limitations under the License. */ -package org.apache.hadoop.mapreduce +package org.apache.spark.mapreduce import java.lang.{Boolean => JBoolean, Integer => JInteger} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID} -private[apache] +private[spark] trait SparkHadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { val klass = firstAvailableClass( diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 351e145f96..e55d03d391 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -35,6 +35,7 @@ import org.apache.spark.Partition import org.apache.spark.SerializableWritable import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.executor.{DataReadMethod, InputMetrics} +import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index da89f634ab..462f0d6268 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -33,13 +33,14 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, -RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} +RecordWriter => NewRecordWriter} import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6a0495f8fd..a94d09be3b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -77,6 +77,14 @@ object MimaExcludes { // SPARK-3822 ProblemFilters.exclude[IncompatibleResultTypeProblem]( "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") + ) ++ Seq( + // SPARK-1209 + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"), + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.rdd.PairRDDFunctions") ) case v if v.startsWith("1.1") => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index d00860a8bb..74c43e053b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -43,6 +43,7 @@ import parquet.hadoop.util.ContextUtil import parquet.io.ParquetDecodingException import parquet.schema.MessageType +import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.SQLConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index bf2ce9df67..cc8bb3e172 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ +import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.Row import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc} |