aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorYuming Wang <wgyumg@gmail.com>2017-02-28 10:13:42 +0000
committerSean Owen <sowen@cloudera.com>2017-02-28 10:13:42 +0000
commit9b8eca65dcf68129470ead39362ce870ffb0bb1d (patch)
tree282c7af7443b31416ff3f9821615f18635de916b /core/src/main/scala
parenta350bc16d36c58b48ac01f0258678ffcdb77e793 (diff)
downloadspark-9b8eca65dcf68129470ead39362ce870ffb0bb1d.tar.gz
spark-9b8eca65dcf68129470ead39362ce870ffb0bb1d.tar.bz2
spark-9b8eca65dcf68129470ead39362ce870ffb0bb1d.zip
[SPARK-19660][CORE][SQL] Replace the configuration property names that are deprecated in the version of Hadoop 2.6
## What changes were proposed in this pull request? Replace all the Hadoop deprecated configuration property names according to [DeprecatedProperties](https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-common/DeprecatedProperties.html). except: https://github.com/apache/spark/blob/v2.1.0/python/pyspark/sql/tests.py#L1533 https://github.com/apache/spark/blob/v2.1.0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala#L987 https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala#L45 https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L614 ## How was this patch tested? Existing tests Author: Yuming Wang <wgyumg@gmail.com> Closes #16990 from wangyum/HadoopDeprecatedProperties.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala9
4 files changed, 16 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 2c1b563688..22e2679913 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -113,11 +113,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
val taskAttemptId = new TaskAttemptID(taskId, 0)
// Set up the configuration object
- jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
- jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
- jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
- jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
- jobContext.getConfiguration.setInt("mapred.task.partition", 0)
+ jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
+ jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+ jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+ jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
+ jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 1e0a1e605c..659ad5d0ba 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -79,7 +79,7 @@ object SparkHadoopMapReduceWriter extends Logging {
val committer = FileCommitProtocol.instantiate(
className = classOf[HadoopMapReduceCommitProtocol].getName,
jobId = stageId.toString,
- outputPath = conf.value.get("mapred.output.dir"),
+ outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 5fa6a7ed31..4bf8ecc383 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -365,11 +365,11 @@ private[spark] object HadoopRDD extends Logging {
val jobID = new JobID(jobTrackerId, jobId)
val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)
- conf.set("mapred.tip.id", taId.getTaskID.toString)
- conf.set("mapred.task.id", taId.toString)
- conf.setBoolean("mapred.task.is.map", true)
- conf.setInt("mapred.task.partition", splitId)
- conf.set("mapred.job.id", jobID.toString)
+ conf.set("mapreduce.task.id", taId.getTaskID.toString)
+ conf.set("mapreduce.task.attempt.id", taId.toString)
+ conf.setBoolean("mapreduce.task.ismap", true)
+ conf.setInt("mapreduce.task.partition", splitId)
+ conf.set("mapreduce.job.id", jobID.toString)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 567a3183e2..52ce03ff8c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -998,7 +998,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
val jobConfiguration = job.getConfiguration
- jobConfiguration.set("mapred.output.dir", path)
+ jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}
@@ -1039,10 +1039,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
- hadoopConf.set("mapred.output.compress", "true")
+ hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
- hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
- hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+ hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
+ hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
+ CompressionType.BLOCK.toString)
}
// Use configured output committer if already set