aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-12 16:23:55 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-09-12 16:23:55 -0700
commitb3a7480ab0821ab38f710de96e3ac4a13f62dbca (patch)
tree2a286b61a1a7e22cc0f0306b0c073b0ae3ac9e89 /core
parentf4a22808e03fa12bfe1bfc82cf713cfda7e063a9 (diff)
downloadspark-b3a7480ab0821ab38f710de96e3ac4a13f62dbca.tar.gz
spark-b3a7480ab0821ab38f710de96e3ac4a13f62dbca.tar.bz2
spark-b3a7480ab0821ab38f710de96e3ac4a13f62dbca.zip
[SPARK-10330] Add Scalastyle rule to require use of SparkHadoopUtil JobContext methods
This is a followup to #8499 which adds a Scalastyle rule to mandate the use of SparkHadoopUtil's JobContext accessor methods and fixes the existing violations. Author: Josh Rosen <joshrosen@databricks.com> Closes #8521 from JoshRosen/SPARK-10330-part2.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala6
5 files changed, 17 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbfe8bf31c..e27b3c4962 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -858,7 +858,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = job.getConfiguration
+ val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
@@ -910,7 +910,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = job.getConfiguration
+ val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
@@ -1092,7 +1092,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updatedConf = job.getConfiguration
+ val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index f7723ef5bd..a0b7365df9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -192,7 +192,9 @@ class SparkHadoopUtil extends Logging {
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
+ // scalastyle:off jobconfig
val method = context.getClass.getMethod("getConfiguration")
+ // scalastyle:on jobconfig
method.invoke(context).asInstanceOf[Configuration]
}
@@ -204,7 +206,9 @@ class SparkHadoopUtil extends Logging {
*/
def getTaskAttemptIDFromTaskAttemptContext(
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+ // scalastyle:off jobconfig
val method = context.getClass.getMethod("getTaskAttemptID")
+ // scalastyle:on jobconfig
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c59f0d4aa7..199d79b811 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -996,8 +996,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
- job.getConfiguration.set("mapred.output.dir", path)
- saveAsNewAPIHadoopDataset(job.getConfiguration)
+ val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ jobConfiguration.set("mapred.output.dir", path)
+ saveAsNewAPIHadoopDataset(jobConfiguration)
}
/**
@@ -1064,7 +1065,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val wrappedConf = new SerializableConfiguration(job.getConfiguration)
+ val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 9babe56267..0228c54e05 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -86,7 +86,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (isDriverSide) {
initDriverSideJobFuncOpt.map(f => f(job))
}
- job.getConfiguration
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job)
}
private val jobTrackerId: String = {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 418763f4e5..fdb00aafc4 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.io.{File, FileWriter}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.PortableDataStream
import org.apache.spark.storage.StorageLevel
@@ -506,8 +507,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
- job.getConfiguration.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
- randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
+ val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
+ randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}