diff options
author | Yuming Wang <wgyumg@gmail.com> | 2017-02-28 10:13:42 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-28 10:13:42 +0000 |
commit | 9b8eca65dcf68129470ead39362ce870ffb0bb1d (patch) | |
tree | 282c7af7443b31416ff3f9821615f18635de916b /core/src/main/scala | |
parent | a350bc16d36c58b48ac01f0258678ffcdb77e793 (diff) | |
download | spark-9b8eca65dcf68129470ead39362ce870ffb0bb1d.tar.gz spark-9b8eca65dcf68129470ead39362ce870ffb0bb1d.tar.bz2 spark-9b8eca65dcf68129470ead39362ce870ffb0bb1d.zip |
[SPARK-19660][CORE][SQL] Replace the configuration property names that are deprecated in the version of Hadoop 2.6
## What changes were proposed in this pull request?
Replace all the Hadoop deprecated configuration property names according to [DeprecatedProperties](https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/DeprecatedProperties.html).
except:
https://github.com/apache/spark/blob/v2.1.0/python/pyspark/sql/tests.py#L1533
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala#L987
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala#L45
https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L614
## How was this patch tested?
Existing tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes #16990 from wangyum/HadoopDeprecatedProperties.
Diffstat (limited to 'core/src/main/scala')
4 files changed, 16 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 2c1b563688..22e2679913 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -113,11 +113,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val taskAttemptId = new TaskAttemptID(taskId, 0) // Set up the configuration object - jobContext.getConfiguration.set("mapred.job.id", jobId.toString) - jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString) - jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString) - jobContext.getConfiguration.setBoolean("mapred.task.is.map", true) - jobContext.getConfiguration.setInt("mapred.task.partition", 0) + jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString) + jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString) + jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) + jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 1e0a1e605c..659ad5d0ba 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -79,7 +79,7 @@ object SparkHadoopMapReduceWriter extends Logging { val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, - outputPath = conf.value.get("mapred.output.dir"), + outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 5fa6a7ed31..4bf8ecc383 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -365,11 +365,11 @@ private[spark] object HadoopRDD extends Logging { val jobID = new JobID(jobTrackerId, jobId) val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId) - conf.set("mapred.tip.id", taId.getTaskID.toString) - conf.set("mapred.task.id", taId.toString) - conf.setBoolean("mapred.task.is.map", true) - conf.setInt("mapred.task.partition", splitId) - conf.set("mapred.job.id", jobID.toString) + conf.set("mapreduce.task.id", taId.getTaskID.toString) + conf.set("mapreduce.task.attempt.id", taId.toString) + conf.setBoolean("mapreduce.task.ismap", true) + conf.setInt("mapreduce.task.partition", splitId) + conf.set("mapreduce.job.id", jobID.toString) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 567a3183e2..52ce03ff8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -998,7 +998,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) val jobConfiguration = job.getConfiguration - jobConfiguration.set("mapred.output.dir", path) + jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path) saveAsNewAPIHadoopDataset(jobConfiguration) } @@ -1039,10 +1039,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) conf.setOutputFormat(outputFormatClass) for (c <- codec) { hadoopConf.setCompressMapOutput(true) - hadoopConf.set("mapred.output.compress", "true") + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConf.setMapOutputCompressorClass(c) - hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) - hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName) + hadoopConf.set("mapreduce.output.fileoutputformat.compress.type", + CompressionType.BLOCK.toString) } // Use configured output committer if already set |