aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala2
4 files changed, 13 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbfe8bf31c..e27b3c4962 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -858,7 +858,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = job.getConfiguration
+ val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
@@ -910,7 +910,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = job.getConfiguration
+ val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
@@ -1092,7 +1092,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updatedConf = job.getConfiguration
+ val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index f7723ef5bd..a0b7365df9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -192,7 +192,9 @@ class SparkHadoopUtil extends Logging {
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
+ // scalastyle:off jobconfig
val method = context.getClass.getMethod("getConfiguration")
+ // scalastyle:on jobconfig
method.invoke(context).asInstanceOf[Configuration]
}
@@ -204,7 +206,9 @@ class SparkHadoopUtil extends Logging {
*/
def getTaskAttemptIDFromTaskAttemptContext(
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+ // scalastyle:off jobconfig
val method = context.getClass.getMethod("getTaskAttemptID")
+ // scalastyle:on jobconfig
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c59f0d4aa7..199d79b811 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -996,8 +996,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
- job.getConfiguration.set("mapred.output.dir", path)
- saveAsNewAPIHadoopDataset(job.getConfiguration)
+ val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ jobConfiguration.set("mapred.output.dir", path)
+ saveAsNewAPIHadoopDataset(jobConfiguration)
}
/**
@@ -1064,7 +1065,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val wrappedConf = new SerializableConfiguration(job.getConfiguration)
+ val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 9babe56267..0228c54e05 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -86,7 +86,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (isDriverSide) {
initDriverSideJobFuncOpt.map(f => f(job))
}
- job.getConfiguration
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job)
}
private val jobTrackerId: String = {