aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-12-21 22:10:19 -0800
committerReynold Xin <rxin@databricks.com>2014-12-21 22:10:30 -0800
commit665653d240c0563f23ad6922b217ff6f0d548874 (patch)
tree140d256f64cec2761cdfb26a16dda7dd81931139 /core
parent4346a2ba1a1c62732c5e313b464019fe6259a4a8 (diff)
downloadspark-665653d240c0563f23ad6922b217ff6f0d548874.tar.gz
spark-665653d240c0563f23ad6922b217ff6f0d548874.tar.bz2
spark-665653d240c0563f23ad6922b217ff6f0d548874.zip
[SPARK-2075][Core] Make the compiler generate same bytes code for Hadoop 1.+ and Hadoop 2.+
`NullWritable` is a `Comparable` rather than `Comparable[NullWritable]` in Hadoop 1.+, so the compiler cannot find an implicit Ordering for it. It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler will generate same codes. I used the following commands to confirm the generated byte codes are some. ``` mvn -Dhadoop.version=1.2.1 -DskipTests clean package -pl core -am javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop1.txt mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package -pl core -am javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop2.txt diff ~/hadoop1.txt ~/hadoop2.txt ``` However, the compiler will generate different codes for the classes which call methods of `JobContext/TaskAttemptContext`. `JobContext/TaskAttemptContext` is a class in Hadoop 1.+, and calling its method will use `invokevirtual`, while it's an interface in Hadoop 2.+, and will use `invokeinterface`. To fix it, we can use reflection to call `JobContext/TaskAttemptContext.getConfiguration`. Author: zsxwing <zsxwing@gmail.com> Closes #3740 from zsxwing/SPARK-2075 and squashes the following commits: 39d9df2 [zsxwing] Fix the code style e4ad8b5 [zsxwing] Use null for the implicit Ordering 734bac9 [zsxwing] Explicitly set the implicit parameters ca03559 [zsxwing] Use reflection to access JobContext/TaskAttemptContext.getConfiguration fa40db0 [zsxwing] Add an Ordering for NullWritable to make the compiler generate same byte codes for RDD (cherry picked from commit 6ee6aa70b7d52408cc66bd1434cbeae3212e3f01) Signed-off-by: Reynold Xin <rxin@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala21
6 files changed, 41 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 60ee115e39..57f9faf5dd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
@@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}
+
+ /**
+ * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
+ * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
+ * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
+ * while it's interface in Hadoop 2.+.
+ */
+ def getConfigurationFromJobContext(context: JobContext): Configuration = {
+ val method = context.getClass.getMethod("getConfiguration")
+ method.invoke(context).asInstanceOf[Configuration]
+ }
}
object SparkHadoopUtil {
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 89b29af200..c219d21fbe 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* Custom Input Format for reading and splitting flat binary files that contain records,
@@ -33,7 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat {
/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
- context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
+ SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
}
}
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
index 36a1e5d475..67a96925da 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
- val job = context.getConfiguration
+ val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
if (codec != null) {
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 457472547f..593a62b3e3 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
import org.apache.spark.annotation.Experimental
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* A general format for reading whole files in as streams, byte arrays,
@@ -145,7 +146,8 @@ class PortableDataStream(
private val confBytes = {
val baos = new ByteArrayOutputStream()
- context.getConfiguration.write(new DataOutputStream(baos))
+ SparkHadoopUtil.get.getConfigurationFromJobContext(context).
+ write(new DataOutputStream(baos))
baos.toByteArray
}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 6d59b24eb0..4fa84b69aa 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.deploy.SparkHadoopUtil
/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
@@ -45,7 +46,8 @@ private[spark] class WholeTextFileRecordReader(
def getConf: Configuration = conf
private[this] val path = split.getPath(index)
- private[this] val fs = path.getFileSystem(context.getConfiguration)
+ private[this] val fs = path.getFileSystem(
+ SparkHadoopUtil.get.getConfigurationFromJobContext(context))
// True means the current file has been processed, then skip it.
private[this] var processed = false
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e4025bcf48..8f4db148ea 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1160,7 +1160,20 @@ abstract class RDD[T: ClassTag](
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
- this.map(x => (NullWritable.get(), new Text(x.toString)))
+ // https://issues.apache.org/jira/browse/SPARK-2075
+ //
+ // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
+ // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
+ // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
+ // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
+ // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
+ //
+ // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
+ // same bytecodes for `saveAsTextFile`.
+ val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
+ val textClassTag = implicitly[ClassTag[Text]]
+ val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
+ RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
@@ -1168,7 +1181,11 @@ abstract class RDD[T: ClassTag](
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
- this.map(x => (NullWritable.get(), new Text(x.toString)))
+ // https://issues.apache.org/jira/browse/SPARK-2075
+ val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
+ val textClassTag = implicitly[ClassTag[Text]]
+ val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
+ RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}