aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-06-25 00:23:32 -0700
committerReynold Xin <rxin@apache.org>2014-06-25 00:23:32 -0700
commitacc01ab3265c317f36a4fca28d3b9d72b0096c12 (patch)
treec782ff79bb3dbad838b591b084bed14e74ea818c /core
parent22036aeb1b2cac7f48cd60afea925b42a5318631 (diff)
downloadspark-acc01ab3265c317f36a4fca28d3b9d72b0096c12.tar.gz
spark-acc01ab3265c317f36a4fca28d3b9d72b0096c12.tar.bz2
spark-acc01ab3265c317f36a4fca28d3b9d72b0096c12.zip
SPARK-2038: rename "conf" parameters in the saveAsHadoop functions with source-compatibility
https://issues.apache.org/jira/browse/SPARK-2038 to differentiate with SparkConf object and at the same time keep the source level compatibility Author: CodingCat <zhunansjtu@gmail.com> Closes #1137 from CodingCat/SPARK-2038 and squashes the following commits: 11abeba [CodingCat] revise the comments 7ee5712 [CodingCat] to keep the source-compatibility 763975f [CodingCat] style fix d91288d [CodingCat] rename "conf" parameters in the saveAsHadoop functions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala49
1 files changed, 29 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 2b2b9ae3fd..fc9beb166b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -762,7 +762,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration)
{
- val job = new NewAPIHadoopJob(conf)
+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
+ val hadoopConf = conf
+ val job = new NewAPIHadoopJob(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
@@ -795,22 +797,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) {
- conf.setOutputKeyClass(keyClass)
- conf.setOutputValueClass(valueClass)
+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
+ val hadoopConf = conf
+ hadoopConf.setOutputKeyClass(keyClass)
+ hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
- conf.set("mapred.output.format.class", outputFormatClass.getName)
+ hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
- conf.setCompressMapOutput(true)
- conf.set("mapred.output.compress", "true")
- conf.setMapOutputCompressorClass(c)
- conf.set("mapred.output.compression.codec", c.getCanonicalName)
- conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+ hadoopConf.setCompressMapOutput(true)
+ hadoopConf.set("mapred.output.compress", "true")
+ hadoopConf.setMapOutputCompressorClass(c)
+ hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
+ hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
- conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
- saveAsHadoopDataset(conf)
+ hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+ FileOutputFormat.setOutputPath(hadoopConf,
+ SparkHadoopWriter.createPathFromString(path, hadoopConf))
+ saveAsHadoopDataset(hadoopConf)
}
/**
@@ -820,7 +825,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* configured for a Hadoop MapReduce job.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration) {
- val job = new NewAPIHadoopJob(conf)
+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
+ val hadoopConf = conf
+ val job = new NewAPIHadoopJob(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
@@ -877,9 +884,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* MapReduce job.
*/
def saveAsHadoopDataset(conf: JobConf) {
- val outputFormatInstance = conf.getOutputFormat
- val keyClass = conf.getOutputKeyClass
- val valueClass = conf.getOutputValueClass
+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
+ val hadoopConf = conf
+ val outputFormatInstance = hadoopConf.getOutputFormat
+ val keyClass = hadoopConf.getOutputKeyClass
+ val valueClass = hadoopConf.getOutputValueClass
if (outputFormatInstance == null) {
throw new SparkException("Output format class not set")
}
@@ -889,18 +898,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
- SparkHadoopUtil.get.addCredentials(conf)
+ SparkHadoopUtil.get.addCredentials(hadoopConf)
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
- val ignoredFs = FileSystem.get(conf)
- conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
+ val ignoredFs = FileSystem.get(hadoopConf)
+ hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
}
- val writer = new SparkHadoopWriter(conf)
+ val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {