diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-06-25 00:23:32 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-25 00:23:32 -0700 |
commit | acc01ab3265c317f36a4fca28d3b9d72b0096c12 (patch) | |
tree | c782ff79bb3dbad838b591b084bed14e74ea818c /core | |
parent | 22036aeb1b2cac7f48cd60afea925b42a5318631 (diff) | |
download | spark-acc01ab3265c317f36a4fca28d3b9d72b0096c12.tar.gz spark-acc01ab3265c317f36a4fca28d3b9d72b0096c12.tar.bz2 spark-acc01ab3265c317f36a4fca28d3b9d72b0096c12.zip |
SPARK-2038: rename "conf" parameters in the saveAsHadoop functions with source-compatibility
https://issues.apache.org/jira/browse/SPARK-2038
to differentiate with SparkConf object and at the same time keep the source level compatibility
Author: CodingCat <zhunansjtu@gmail.com>
Closes #1137 from CodingCat/SPARK-2038 and squashes the following commits:
11abeba [CodingCat] revise the comments
7ee5712 [CodingCat] to keep the source-compatibility
763975f [CodingCat] style fix
d91288d [CodingCat] rename "conf" parameters in the saveAsHadoop functions
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 2b2b9ae3fd..fc9beb166b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -762,7 +762,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration) { - val job = new NewAPIHadoopJob(conf) + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val job = new NewAPIHadoopJob(hadoopConf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) @@ -795,22 +797,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { - conf.setOutputKeyClass(keyClass) - conf.setOutputValueClass(valueClass) + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + hadoopConf.setOutputKeyClass(keyClass) + hadoopConf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.set("mapred.output.format.class", outputFormatClass.getName) + hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { - conf.setCompressMapOutput(true) - conf.set("mapred.output.compress", "true") - conf.setMapOutputCompressorClass(c) - conf.set("mapred.output.compression.codec", c.getCanonicalName) - conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + hadoopConf.setCompressMapOutput(true) + hadoopConf.set("mapred.output.compress", "true") + hadoopConf.setMapOutputCompressorClass(c) + hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) + hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) - saveAsHadoopDataset(conf) + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(hadoopConf, + SparkHadoopWriter.createPathFromString(path, hadoopConf)) + saveAsHadoopDataset(hadoopConf) } /** @@ -820,7 +825,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * configured for a Hadoop MapReduce job. */ def saveAsNewAPIHadoopDataset(conf: Configuration) { - val job = new NewAPIHadoopJob(conf) + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val job = new NewAPIHadoopJob(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id @@ -877,9 +884,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf) { - val outputFormatInstance = conf.getOutputFormat - val keyClass = conf.getOutputKeyClass - val valueClass = conf.getOutputValueClass + // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). + val hadoopConf = conf + val outputFormatInstance = hadoopConf.getOutputFormat + val keyClass = hadoopConf.getOutputKeyClass + val valueClass = hadoopConf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } @@ -889,18 +898,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) if (valueClass == null) { throw new SparkException("Output value class not set") } - SparkHadoopUtil.get.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(hadoopConf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { // FileOutputFormat ignores the filesystem parameter - val ignoredFs = FileSystem.get(conf) - conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) + val ignoredFs = FileSystem.get(hadoopConf) + hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) } - val writer = new SparkHadoopWriter(conf) + val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { |